diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java index 734c10570ab2b..06dc8919360f8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java @@ -9,6 +9,7 @@ package org.elasticsearch.datastreams; import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.features.FeatureSpecification; import org.elasticsearch.features.NodeFeature; @@ -24,7 +25,8 @@ public class DataStreamFeatures implements FeatureSpecification { public Set getFeatures() { return Set.of( DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 - LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER // Added in 8.13 + LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13 + DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE ); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 4cebba155518b..4694d95964013 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -132,6 +133,7 @@ public static TimeValue getLookAheadTime(Settings settings) { private final SetOnce dataLifecycleInitialisationService = new SetOnce<>(); private final SetOnce dataStreamLifecycleErrorsPublisher = new SetOnce<>(); private final SetOnce dataStreamLifecycleHealthIndicatorService = new SetOnce<>(); + private final SetOnce dataStreamAutoShardingServiceSetOnce = new SetOnce<>(); private final Settings settings; public DataStreamsPlugin(Settings settings) { @@ -166,6 +168,11 @@ public List> getSettings() { pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING); + pluginSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING); + pluginSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN); + pluginSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN); + pluginSettings.add(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_NUMBER_WRITE_THREADS); + pluginSettings.add(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_NUMBER_WRITE_THREADS); return pluginSettings; } @@ -206,9 +213,19 @@ public Collection createComponents(PluginServices services) { dataLifecycleInitialisationService.get().init(); dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService()); + dataStreamAutoShardingServiceSetOnce.set( + new DataStreamAutoShardingService( + settings, + services.clusterService(), + services.featureService(), + services.threadPool()::absoluteTimeInMillis + ) + ); + components.add(errorStoreInitialisationService.get()); components.add(dataLifecycleInitialisationService.get()); components.add(dataStreamLifecycleErrorsPublisher.get()); + components.add(dataStreamAutoShardingServiceSetOnce.get()); return components; } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java index d0b41c847a61d..b61cbdc837010 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java @@ -318,7 +318,7 @@ private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_"); - return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null); + return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null, null); } private Index getWriteIndex(ClusterState state, String name, String timestamp) { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java index 4f2df2c690bc8..2bfbeb8e37aaf 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java @@ -119,7 +119,8 @@ public void testRolloverClusterStateForDataStream() throws Exception { now, randomBoolean(), false, - indexStats + indexStats, + null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -218,6 +219,7 @@ public void testRolloverAndMigrateDataStream() throws Exception { now, randomBoolean(), false, + null, null ); @@ -310,6 +312,7 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting now, randomBoolean(), false, + null, null ); @@ -375,7 +378,8 @@ public void testRolloverClusterStateWithBrokenOlderTsdbDataStream() throws Excep now, randomBoolean(), false, - indexStats + indexStats, + null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -455,7 +459,8 @@ public void testRolloverClusterStateWithBrokenTsdbDataStream() throws Exception now, randomBoolean(), false, - indexStats + indexStats, + null ) ); assertThat(e.getMessage(), containsString("is overlapping with backing index")); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardingCondition.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardingCondition.java new file mode 100644 index 0000000000000..ce758efaacfa3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/AutoShardingCondition.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.rollover; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.CURRENT_NUMBER_OF_SHARDS; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.TARGET_NUMBER_OF_SHARDS; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult.WRITE_LOAD; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS; + +/** + * Condition for automatically increasing the number of shards for a data stream. The value is computed when the condition is + * evaluated. + */ +public class AutoShardingCondition extends Condition { + public static final String NAME = "auto_sharding"; + private boolean isConditionMet; + + public AutoShardingCondition(AutoShardingResult autoShardingResult) { + super(NAME, Type.AUTOMATIC); + this.value = autoShardingResult; + this.isConditionMet = (value.type() == INCREASE_NUMBER_OF_SHARDS && value.coolDownRemaining().equals(TimeValue.ZERO)); + } + + public AutoShardingCondition(StreamInput in) throws IOException { + super(NAME, Type.AUTOMATIC); + this.value = new AutoShardingResult(in); + } + + @Override + public Result evaluate(final Stats stats) { + return new Result(this, isConditionMet); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + value.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // we only save this representation in the cluster state as part of meet_conditions when this condition is met + if (isConditionMet) { + builder.startObject(NAME); + builder.field(CURRENT_NUMBER_OF_SHARDS.getPreferredName(), value.currentNumberOfShards()); + builder.field(TARGET_NUMBER_OF_SHARDS.getPreferredName(), value.targetNumberOfShards()); + assert value.writeLoad() != null + : "when the condition matches, a change in number of shards is executed and a write load must be present"; + builder.field(WRITE_LOAD.getPreferredName(), value.writeLoad()); + builder.endObject(); + } + return builder; + } + + public static AutoShardingCondition fromXContent(XContentParser parser) throws IOException { + if (parser.nextToken() == XContentParser.Token.START_OBJECT) { + return new AutoShardingCondition( + new AutoShardingResult( + INCREASE_NUMBER_OF_SHARDS, + parser.intValue(), + parser.intValue(), + TimeValue.ZERO, + parser.doubleValue() + ) + ); + } else { + throw new IllegalArgumentException("invalid token when parsing " + NAME + " condition: " + parser.currentToken()); + } + } + + @Override + boolean includedInVersion(TransportVersion version) { + return version.onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java index ba7d6b03043c5..b4a466dc9aa1e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java @@ -20,12 +20,14 @@ */ public abstract class Condition implements NamedWriteable, ToXContentFragment { - /** - * Describes the type of condition - a min_* condition (MIN) or max_* condition (MAX). + /* + * Describes the type of condition - a min_* condition (MIN), max_* condition (MAX), or an automatic condition (automatic conditions + * are something that the platform configures and manages) */ public enum Type { MIN, - MAX + MAX, + AUTOMATIC } protected T value; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java index 9266a320f598c..623186e052eb7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -59,6 +60,7 @@ public TransportLazyRolloverAction( MetadataRolloverService rolloverService, AllocationService allocationService, MetadataDataStreamsService metadataDataStreamsService, + DataStreamAutoShardingService dataStreamAutoShardingService, Client client ) { super( @@ -71,7 +73,8 @@ public TransportLazyRolloverAction( rolloverService, client, allocationService, - metadataDataStreamsService + metadataDataStreamsService, + dataStreamAutoShardingService ); } @@ -121,6 +124,7 @@ protected void masterOperation( new RolloverRequest(rolloverRequest.getRolloverTarget(), null), null, trialRolloverResponse, + null, listener ); submitRolloverTask(rolloverRequest, source, rolloverTask); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 4972a784cc2bd..90fa8bc0d66bd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -8,14 +8,18 @@ package org.elasticsearch.action.admin.indices.rollover; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataStats; @@ -31,6 +35,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; @@ -48,6 +53,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS; @@ -61,6 +67,7 @@ * Service responsible for handling rollover requests for write aliases and data streams */ public class MetadataRolloverService { + private static final Logger logger = LogManager.getLogger(MetadataRolloverService.class); private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$"); private static final List VALID_ROLLOVER_TARGETS = List.of(ALIAS, DATA_STREAM); @@ -110,7 +117,8 @@ public RolloverResult rolloverClusterState( Instant now, boolean silent, boolean onlyValidate, - @Nullable IndexMetadataStats sourceIndexStats + @Nullable IndexMetadataStats sourceIndexStats, + @Nullable AutoShardingResult autoShardingResult ) throws Exception { validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest); final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget); @@ -134,7 +142,8 @@ public RolloverResult rolloverClusterState( now, silent, onlyValidate, - sourceIndexStats + sourceIndexStats, + autoShardingResult ); default -> // the validate method above prevents this case @@ -244,7 +253,8 @@ private RolloverResult rolloverDataStream( Instant now, boolean silent, boolean onlyValidate, - @Nullable IndexMetadataStats sourceIndexStats + @Nullable IndexMetadataStats sourceIndexStats, + @Nullable AutoShardingResult autoShardingResult ) throws Exception { if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) { @@ -281,6 +291,34 @@ private RolloverResult rolloverDataStream( return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), currentState); } + AtomicReference newAutoShardingEvent = new AtomicReference<>(); + if (autoShardingResult != null) { + // we're auto sharding on rollover + assert autoShardingResult.coolDownRemaining().equals(TimeValue.ZERO) : "the auto sharding result must be ready to apply"; + logger.info("Auto sharding data stream [{}] to [{}]", dataStreamName, autoShardingResult); + Settings settingsWithAutoSharding = Settings.builder() + .put(createIndexRequest.settings()) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), autoShardingResult.targetNumberOfShards()) + .build(); + createIndexRequest.settings(settingsWithAutoSharding); + newAutoShardingEvent.set( + new DataStreamAutoShardingEvent( + dataStream.getWriteIndex().getName(), + dataStream.getGeneration(), + autoShardingResult.targetNumberOfShards(), + now.toEpochMilli() + ) + ); + } else if (dataStream.getAutoShardingEvent() != null) { + // we're not auto sharding on this rollover but maybe a previous rollover did so we have to use the number of shards + // configured by the previous auto sharding event + Settings settingsWithAutoSharding = Settings.builder() + .put(createIndexRequest.settings()) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), dataStream.getAutoShardingEvent().targetNumberOfShards()) + .build(); + createIndexRequest.settings(settingsWithAutoSharding); + } + var createIndexClusterStateRequest = prepareDataStreamCreateIndexRequest( dataStreamName, newWriteIndexName, @@ -298,7 +336,14 @@ private RolloverResult rolloverDataStream( silent, (builder, indexMetadata) -> { downgradeBrokenTsdbBackingIndices(dataStream, builder); - builder.put(dataStream.rollover(indexMetadata.getIndex(), newGeneration, metadata.isTimeSeriesTemplate(templateV2))); + builder.put( + dataStream.rollover( + indexMetadata.getIndex(), + newGeneration, + metadata.isTimeSeriesTemplate(templateV2), + newAutoShardingEvent.get() + ) + ); }, rerouteCompletionIsNotRequired() ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java index 24f93ccb45348..7bed68ef99a95 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java @@ -7,6 +7,7 @@ */ package org.elasticsearch.action.admin.indices.rollover; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -243,7 +244,12 @@ public boolean areConditionsMet(Map conditionResults) { .filter(c -> Condition.Type.MAX == c.type()) .anyMatch(c -> conditionResults.getOrDefault(c.toString(), false)); - return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet); + boolean anyInternalConditionsMet = conditions.values() + .stream() + .filter(c -> Condition.Type.AUTOMATIC == c.type()) + .anyMatch(c -> conditionResults.getOrDefault(c.toString(), false)); + + return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet) || anyInternalConditionsMet; } public static RolloverConditions fromXContent(XContentParser parser) throws IOException { @@ -408,6 +414,15 @@ public Builder addMinPrimaryShardDocsCondition(Long numDocs) { return this; } + /** + * Adds an auto sharding scale up condition. + */ + public Builder addAutoShardingCondition(AutoShardingResult autoShardingResult) { + AutoShardingCondition autoShardingCondition = new AutoShardingCondition(autoShardingResult); + this.conditions.put(autoShardingCondition.name, autoShardingCondition); + return this; + } + public RolloverConditions build() { return new RolloverConditions(conditions); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index 481eda825b047..8274325886978 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -17,6 +17,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingResult; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.IndicesOptions; @@ -27,6 +29,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataStats; @@ -44,7 +47,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -60,6 +65,9 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.DECREASES_NUMBER_OF_SHARDS; +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.AutoShardingType.INCREASE_NUMBER_OF_SHARDS; + /** * Main class to swap the index pointed to by an alias, given some conditions */ @@ -70,6 +78,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction rolloverTaskQueue; private final MetadataDataStreamsService metadataDataStreamsService; + private final DataStreamAutoShardingService dataStreamAutoShardingService; @Inject public TransportRolloverAction( @@ -81,7 +90,8 @@ public TransportRolloverAction( MetadataRolloverService rolloverService, Client client, AllocationService allocationService, - MetadataDataStreamsService metadataDataStreamsService + MetadataDataStreamsService metadataDataStreamsService, + DataStreamAutoShardingService dataStreamAutoShardingService ) { this( RolloverAction.INSTANCE, @@ -93,7 +103,8 @@ public TransportRolloverAction( rolloverService, client, allocationService, - metadataDataStreamsService + metadataDataStreamsService, + dataStreamAutoShardingService ); } @@ -107,7 +118,8 @@ public TransportRolloverAction( MetadataRolloverService rolloverService, Client client, AllocationService allocationService, - MetadataDataStreamsService metadataDataStreamsService + MetadataDataStreamsService metadataDataStreamsService, + DataStreamAutoShardingService dataStreamAutoShardingService ) { super( actionType.name(), @@ -127,6 +139,7 @@ public TransportRolloverAction( new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool) ); this.metadataDataStreamsService = metadataDataStreamsService; + this.dataStreamAutoShardingService = dataStreamAutoShardingService; } @Override @@ -221,6 +234,42 @@ protected void masterOperation( listener.delegateFailureAndWrap((delegate, statsResponse) -> { + AutoShardingResult rolloverAutoSharding = null; + final IndexAbstraction indexAbstraction = clusterState.metadata() + .getIndicesLookup() + .get(rolloverRequest.getRolloverTarget()); + if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) { + DataStream dataStream = (DataStream) indexAbstraction; + Double writeLoad = null; + if (statsResponse != null) { + IndexingStats indexing = statsResponse.getTotal().getIndexing(); + if (indexing != null) { + writeLoad = indexing.getTotal().getWriteLoad(); + } + } + + // reach out to the data stream auto sharding service to get a _recommendation_ on auto sharding (note that + // this recommendation might still require waiting for cooldown to lapse or might not be applicable) + var autoShardingRecommendation = dataStreamAutoShardingService.calculate(clusterState, dataStream, writeLoad); + logger.debug("data stream auto sharding result is [{}]", autoShardingRecommendation); + + rolloverAutoSharding = switch (autoShardingRecommendation.type()) { + case NO_CHANGE_REQUIRED, NOT_APPLICABLE -> null; + case INCREASE_NUMBER_OF_SHARDS -> { + // irrespective of the cool down period we want the implicit INCREASE SHARDS condition to be added to the + // rollover request so the response contains an indication of the remaining cooldown + RolloverConditions conditionsIncludingImplicit = RolloverConditions.newBuilder(rolloverRequest.getConditions()) + .addAutoShardingCondition(autoShardingRecommendation) + .build(); + rolloverRequest.setConditions(conditionsIncludingImplicit); + yield autoShardingRecommendation.coolDownRemaining().equals(TimeValue.ZERO) ? autoShardingRecommendation : null; + } + case DECREASES_NUMBER_OF_SHARDS -> autoShardingRecommendation.coolDownRemaining().equals(TimeValue.ZERO) + ? autoShardingRecommendation + : null; + }; + } + // Evaluate the conditions, so that we can tell without a cluster state update whether a rollover would occur. final Map trialConditionResults = evaluateConditions( rolloverRequest.getConditionValues(), @@ -247,7 +296,13 @@ protected void masterOperation( // Pre-check the conditions to see whether we should submit a new cluster state task if (rolloverRequest.areConditionsMet(trialConditionResults)) { String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]"; - RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, delegate); + RolloverTask rolloverTask = new RolloverTask( + rolloverRequest, + statsResponse, + trialRolloverResponse, + rolloverAutoSharding, + delegate + ); submitRolloverTask(rolloverRequest, source, rolloverTask); } else { // conditions not met @@ -317,8 +372,23 @@ record RolloverTask( RolloverRequest rolloverRequest, IndicesStatsResponse statsResponse, RolloverResponse trialRolloverResponse, + @Nullable AutoShardingResult autoShardingResult, ActionListener listener ) implements ClusterStateTaskListener { + RolloverTask { + if (autoShardingResult != null) { + if ((autoShardingResult.type() != INCREASE_NUMBER_OF_SHARDS && autoShardingResult.type() != DECREASES_NUMBER_OF_SHARDS) + || (autoShardingResult.coolDownRemaining().equals(TimeValue.ZERO) == false)) { + throw new IllegalArgumentException( + "The auto sharding recommendation of a rollover task must be valid (increase or " + + "decrease shards) and effective (no cooldown remaining) but received [" + + autoShardingResult + + "]" + ); + } + } + } + @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -411,7 +481,8 @@ public ClusterState executeTask( Instant.now(), false, false, - sourceIndexStats + sourceIndexStats, + rolloverTask.autoShardingResult() ); results.add(rolloverResult); logger.trace("rollover result [{}]", rolloverResult); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 663edcc11d746..f43f4c2bdafaf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -432,19 +432,25 @@ public DataStreamAutoShardingEvent getAutoShardingEvent() { * @param writeIndex new write index * @param generation new generation * @param timeSeries whether the template that created this data stream is in time series mode + * @param autoShardingEvent the auto sharding event this rollover operation is applying * * @return new {@code DataStream} instance with the rollover operation applied */ - public DataStream rollover(Index writeIndex, long generation, boolean timeSeries) { + public DataStream rollover( + Index writeIndex, + long generation, + boolean timeSeries, + @Nullable DataStreamAutoShardingEvent autoShardingEvent + ) { ensureNotReplicated(); - return unsafeRollover(writeIndex, generation, timeSeries); + return unsafeRollover(writeIndex, generation, timeSeries, autoShardingEvent); } /** - * Like {@link #rollover(Index, long, boolean)}, but does no validation, use with care only. + * Like {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}, but does no validation, use with care only. */ - public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries) { + public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries, DataStreamAutoShardingEvent autoShardingEvent) { IndexMode indexMode = this.indexMode; if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) { // This allows for migrating a data stream to be a tsdb data stream: @@ -476,7 +482,7 @@ public DataStream unsafeRollover(Index writeIndex, long generation, boolean time /** * Performs a dummy rollover on a {@code DataStream} instance and returns the tuple of the next write index name and next generation - * that this {@code DataStream} should roll over to using {@link #rollover(Index, long, boolean)}. + * that this {@code DataStream} should roll over to using {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}. * * @param clusterMetadata Cluster metadata * diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index 795ed2120b098..1ca494b32a183 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -8,6 +8,7 @@ package org.elasticsearch.indices; +import org.elasticsearch.action.admin.indices.rollover.AutoShardingCondition; import org.elasticsearch.action.admin.indices.rollover.Condition; import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; @@ -109,7 +110,8 @@ public static List getNamedWriteables() { new NamedWriteableRegistry.Entry(Condition.class, MaxDocsCondition.NAME, MaxDocsCondition::new), new NamedWriteableRegistry.Entry(Condition.class, MaxSizeCondition.NAME, MaxSizeCondition::new), new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardSizeCondition.NAME, MaxPrimaryShardSizeCondition::new), - new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardDocsCondition.NAME, MaxPrimaryShardDocsCondition::new) + new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardDocsCondition.NAME, MaxPrimaryShardDocsCondition::new), + new NamedWriteableRegistry.Entry(Condition.class, AutoShardingCondition.NAME, AutoShardingCondition::new) ); } @@ -164,6 +166,11 @@ public static List getNamedXContents() { Condition.class, new ParseField(MaxPrimaryShardDocsCondition.NAME), (p, c) -> MaxPrimaryShardDocsCondition.fromXContent(p) + ), + new NamedXContentRegistry.Entry( + Condition.class, + new ParseField(AutoShardingCondition.NAME), + (p, c) -> AutoShardingCondition.fromXContent(p) ) ); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 83bdc68d0b9c0..23905c9445d18 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -547,6 +547,7 @@ public void testRolloverClusterState() throws Exception { Instant.now(), randomBoolean(), false, + null, null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -615,6 +616,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { Instant.now(), randomBoolean(), false, + null, null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -701,6 +703,7 @@ public void testValidation() throws Exception { Instant.now(), randomBoolean(), true, + null, null ); @@ -742,6 +745,7 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { Instant.now(), false, randomBoolean(), + null, null ) ); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 950d1a9f22f08..814cff37e0708 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -41,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.cache.query.QueryCacheStats; @@ -111,6 +113,13 @@ public class TransportRolloverActionTests extends ESTestCase { WriteLoadForecaster.DEFAULT ); + final DataStreamAutoShardingService dataStreamAutoShardingService = new DataStreamAutoShardingService( + Settings.EMPTY, + mockClusterService, + new FeatureService(List.of()), + System::currentTimeMillis + ); + @Before public void setUpMocks() { when(mockNode.getId()).thenReturn("mocknode"); @@ -374,7 +383,8 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr rolloverService, mockClient, mockAllocationService, - mockMetadataDataStreamService + mockMetadataDataStreamService, + dataStreamAutoShardingService ); // For given alias, verify that condition evaluation fails when the condition doc count is greater than the primaries doc count @@ -449,7 +459,8 @@ public void testLazyRollover() throws Exception { rolloverService, mockClient, mockAllocationService, - mockMetadataDataStreamService + mockMetadataDataStreamService, + dataStreamAutoShardingService ); final PlainActionFuture future = new PlainActionFuture<>(); RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null); @@ -501,7 +512,8 @@ public void testLazyRolloverFails() throws Exception { rolloverService, mockClient, mockAllocationService, - mockMetadataDataStreamService + mockMetadataDataStreamService, + dataStreamAutoShardingService ); // Lazy rollover fails on a concrete index diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index ffa3a7308da90..26b7bb3b95ef0 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -816,7 +816,12 @@ private SingleForecast forecast(Metadata metadata, DataStream stream, long forec for (int i = 0; i < numberNewIndices; ++i) { final String uuid = UUIDs.randomBase64UUID(); final Tuple rolledDataStreamInfo = stream.unsafeNextWriteIndexAndGeneration(state.metadata()); - stream = stream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2(), false); + stream = stream.unsafeRollover( + new Index(rolledDataStreamInfo.v1(), uuid), + rolledDataStreamInfo.v2(), + false, + stream.getAutoShardingEvent() + ); // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.