Skip to content

Commit

Permalink
Make downsample target index replicas configurable (elastic#99712)
Browse files Browse the repository at this point in the history
* fix: make downsample target index replicas configurable

Introducing this setting in the downsampling logic will allow us
to override its value for Serverless deployments by means of a plugin.
In Steteful deployments we will use a default value of 0 to avoid
unnecessary replication. In Serverless deployments a minimum value
of 1 is required to set the value of 'index.number_of_replicas'.
This is because in Serverless deployments indices without replicas
are not readable and that would break our persistent task receovery
mechanism which relies on reading the latest written tsid into the
downsampling target index. Not having replicas in Serverless would
result in downsampling persistent tasks to always start from scratch.
  • Loading branch information
salvatore-campagna authored and jakelandis committed Oct 2, 2023
1 parent 56d036e commit 0ed1a8e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/99712.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99712
summary: Make downsample target index replicas configurable
area: Downsampling
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class Downsample extends Plugin implements ActionPlugin, PersistentTaskPl

public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing";
private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256;
public static final String DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME = "downsample.min_number_of_replicas";

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,35 +319,42 @@ protected void masterOperation(
return;
}
// 3. Create downsample index
createDownsampleIndex(downsampleIndexName, sourceIndexMetadata, mapping, request, ActionListener.wrap(createIndexResp -> {
if (createIndexResp.isAcknowledged()) {
performShardDownsampling(
request,
listener,
sourceIndexMetadata,
downsampleIndexName,
parentTask,
metricFields,
labelFields
);
} else {
listener.onFailure(new ElasticsearchException("Failed to create downsample index [" + downsampleIndexName + "]"));
}
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
performShardDownsampling(
request,
listener,
sourceIndexMetadata,
downsampleIndexName,
parentTask,
metricFields,
labelFields
);
} else {
listener.onFailure(e);
}
}));
createDownsampleIndex(
clusterService.getSettings(),
downsampleIndexName,
sourceIndexMetadata,
mapping,
request,
ActionListener.wrap(createIndexResp -> {
if (createIndexResp.isAcknowledged()) {
performShardDownsampling(
request,
listener,
sourceIndexMetadata,
downsampleIndexName,
parentTask,
metricFields,
labelFields
);
} else {
listener.onFailure(new ElasticsearchException("Failed to create downsample index [" + downsampleIndexName + "]"));
}
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
performShardDownsampling(
request,
listener,
sourceIndexMetadata,
downsampleIndexName,
parentTask,
metricFields,
labelFields
);
} else {
listener.onFailure(e);
}
})
);
}, listener::onFailure));
}

Expand Down Expand Up @@ -714,6 +721,7 @@ private static void addDynamicTemplates(final XContentBuilder builder) throws IO
}

private void createDownsampleIndex(
Settings settings,
String downsampleIndexName,
IndexMetadata sourceIndexMetadata,
String mapping,
Expand All @@ -729,10 +737,11 @@ private void createDownsampleIndex(
* We should note that there is a risk of losing a node during the downsample process. In this
* case downsample will fail.
*/
int numberOfReplicas = settings.getAsInt(Downsample.DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME, 0);
Settings.Builder builder = Settings.builder()
.put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, sourceIndexMetadata.getNumberOfShards())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, String.valueOf(numberOfReplicas))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
.put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), DownsampleTaskStatus.STARTED);
if (sourceIndexMetadata.getSettings().hasValue(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())) {
Expand Down

0 comments on commit 0ed1a8e

Please sign in to comment.