From fab1fecbea307a22fb1b3c0a583d3dc328e717a7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 31 Mar 2021 09:26:51 +0200 Subject: [PATCH] Allow closing a write index of a data stream. (#71031) Backport of #70908 to 7.12 branch. Prior to this commit when attempting to close a data stream a validation error is returned indicating that it is forbidden to close a write index of a data stream. The idea behind that is to ensure that a data stream always can accept writes. For the same reason deleting a write index is not allowed (the write index can only be deleted when deleting the entire data stream). However closing an index isn't as destructive as deleting an index (an open index request makes the write index available again) and there are other cases where a data stream can't accept writes. For example when primary shards of the write index are not available. So the original reasoning for not allowing to close a write index isn't that strong. On top of this is that this also avoids certain administrative operations from being performed. For example restoring a snapshot containing data streams that already exist in the cluster (in place restore). Closes #70903 #70861 --- docs/reference/indices/open-close.asciidoc | 7 +- .../apis/restore-snapshot-api.asciidoc | 5 +- .../restore-snapshot.asciidoc | 5 +- .../metadata/MetadataIndexStateService.java | 12 -- .../MetadataIndexStateServiceTests.java | 39 ------ .../datastreams/DataStreamsSnapshotsIT.java | 129 ++++++++++++++++-- .../action/TransportFreezeIndexAction.java | 17 +++ .../test/data_stream/20_unsupported_apis.yml | 33 ----- 8 files changed, 145 insertions(+), 102 deletions(-) diff --git a/docs/reference/indices/open-close.asciidoc b/docs/reference/indices/open-close.asciidoc index d5332df87d6ac..84be14809e896 100644 --- a/docs/reference/indices/open-close.asciidoc +++ b/docs/reference/indices/open-close.asciidoc @@ -64,10 +64,11 @@ Closed indices consume a significant amount of disk-space which can cause problems in managed environments. Closing indices can be disabled via the cluster settings API by setting `cluster.indices.close.enable` to `false`. The default is `true`. -The current write index on a data stream cannot be closed. In order to close -the current write index, the data stream must first be +On 7.12.0 and earlier the current write index on a data stream cannot be closed. +In order to close the current write index, the data stream must first be <> so that a new write index is created -and then the previous write index can be closed. +and then the previous write index can be closed. This restriction doesn't apply +from version 7.12.1. // end::closed-index[] diff --git a/docs/reference/snapshot-restore/apis/restore-snapshot-api.asciidoc b/docs/reference/snapshot-restore/apis/restore-snapshot-api.asciidoc index 187bc3b534a80..483e80fbbbdc9 100644 --- a/docs/reference/snapshot-restore/apis/restore-snapshot-api.asciidoc +++ b/docs/reference/snapshot-restore/apis/restore-snapshot-api.asciidoc @@ -72,8 +72,9 @@ POST /_snapshot/my_repository/my_snapshot/_restore Use the restore snapshot API to restore a snapshot of a cluster, including all data streams and indices in the snapshot. If you do not want to restore the entire snapshot, you can select specific data streams or indices to restore. -NOTE: You cannot restore a data stream if a stream with the same name already -exists. +NOTE: On 7.12.0 and earlier, you cannot restore a data stream if a stream with +the same name already exists. This restriction doesn't apply from version +7.12.1. You can run the restore operation on a cluster that contains an elected <> and has data nodes with enough capacity to accommodate the snapshot diff --git a/docs/reference/snapshot-restore/restore-snapshot.asciidoc b/docs/reference/snapshot-restore/restore-snapshot.asciidoc index ac46e43315108..5fa3d404991d7 100644 --- a/docs/reference/snapshot-restore/restore-snapshot.asciidoc +++ b/docs/reference/snapshot-restore/restore-snapshot.asciidoc @@ -107,8 +107,9 @@ new indices if they didn't exist in the cluster. If a data stream is restored, its backing indices are also restored. The restore operation automatically opens restored backing indices if they were closed. -NOTE: You cannot restore a data stream if a stream with the same name already -exists. +NOTE: On 7.12.0 and earlier, you cannot restore a data stream if a stream with +the same name already exists. This restriction doesn't apply from version +7.12.1. In addition to entire data streams, you can restore only specific backing indices from a snapshot. However, restored backing indices are not automatically diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index d1da1aa96bbc6..ecfb4defcf264 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -135,18 +135,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina if (concreteIndices == null || concreteIndices.length == 0) { throw new IllegalArgumentException("Index name is required"); } - List writeIndices = new ArrayList<>(); - SortedMap lookup = clusterService.state().metadata().getIndicesLookup(); - for (Index index : concreteIndices) { - IndexAbstraction ia = lookup.get(index.getName()); - if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) { - writeIndices.add(index.getName()); - } - } - if (writeIndices.size() > 0) { - throw new IllegalArgumentException("cannot close the following data stream write indices [" + - Strings.collectionToCommaDelimitedString(writeIndices) + "]"); - } clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java index fb7f734c3a771..7dd41df6ddd33 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult; import org.elasticsearch.cluster.ClusterName; @@ -27,11 +26,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -43,15 +39,11 @@ import org.elasticsearch.snapshots.SnapshotInfoTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import org.hamcrest.CoreMatchers; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -72,8 +64,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class MetadataIndexStateServiceTests extends ESTestCase { @@ -379,35 +369,6 @@ public void testCloseFailedIfBlockDisappeared() { assertThat(failedIndices, equalTo(disappearedIndices)); } - public void testCloseCurrentWriteIndexForDataStream() { - int numDataStreams = randomIntBetween(1, 3); - List> dataStreamsToCreate = new ArrayList<>(); - List writeIndices = new ArrayList<>(); - for (int k = 0; k < numDataStreams; k++) { - String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT); - int numBackingIndices = randomIntBetween(1, 5); - dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices)); - writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices)); - } - ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate, - org.elasticsearch.common.collect.List.of()); - - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(cs); - - List indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices); - Index[] indicesToDeleteArray = new Index[indicesToDelete.size()]; - for (int k = 0; k < indicesToDelete.size(); k++) { - Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex(); - indicesToDeleteArray[k] = indexToDelete; - } - MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null, null); - CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray); - Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null)); - assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" + - Strings.collectionToCommaDelimitedString(indicesToDelete) + "]")); - } - public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null); } diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index 7e22b36da1341..b2bcf49d4be1c 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -12,30 +12,32 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; -import org.elasticsearch.snapshots.RestoreInfo; -import org.elasticsearch.snapshots.SnapshotInProgressException; -import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.snapshots.SnapshotRestoreException; -import org.elasticsearch.snapshots.SnapshotState; -import org.elasticsearch.xpack.core.action.CreateDataStreamAction; -import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; -import org.elasticsearch.xpack.core.action.GetDataStreamAction; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.collect.List; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.index.Index; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.SnapshotInProgressException; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotRestoreException; +import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction; import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; import org.hamcrest.Matchers; import org.junit.After; @@ -46,6 +48,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.contains; @@ -53,6 +56,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -150,6 +154,109 @@ public void testSnapshotAndRestore() throws Exception { assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName()); } + public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices()); + + // Close all indices: + CloseIndexRequest closeIndexRequest = new CloseIndexRequest("*"); + closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden()); + assertAcked(client.admin().indices().close(closeIndexRequest).actionGet()); + + RestoreSnapshotResponse restoreSnapshotResponse = client.admin() + .cluster() + .prepareRestoreSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .get(); + assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards()); + + assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap()); + SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits(); + assertEquals(1, hits.length); + assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); + + GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "*" }); + GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get(); + assertThat( + ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), + contains(equalTo("ds"), equalTo("other-ds")) + ); + java.util.List backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices(); + assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(DS_BACKING_INDEX_NAME)); + backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices(); + String expectedBackingIndexName = DataStream.getDefaultBackingIndexName("other-ds", 1); + assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(expectedBackingIndexName)); + } + + public void testSnapshotAndRestoreInPlace() throws Exception { + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .setIncludeGlobalState(false) + .get(); + + RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); + assertEquals(RestStatus.OK, status); + + assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices()); + + // A rollover after taking snapshot. The new backing index should be a standalone index after restoring + // and not part of the data stream: + RolloverRequest rolloverRequest = new RolloverRequest("ds", null); + RolloverResponse rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet(); + assertThat(rolloverResponse.isRolledOver(), is(true)); + assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 2))); + + // Close all backing indices of ds data stream: + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(".ds-ds-*"); + closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden()); + assertAcked(client.admin().indices().close(closeIndexRequest).actionGet()); + + RestoreSnapshotResponse restoreSnapshotResponse = client.admin() + .cluster() + .prepareRestoreSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("ds") + .get(); + assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards()); + + assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap()); + SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits(); + assertEquals(1, hits.length); + assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap()); + + GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "ds" }); + GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet(); + assertThat( + ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), + contains(equalTo("ds")) + ); + java.util.List backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices(); + assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1)); + assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(DS_BACKING_INDEX_NAME))); + + // The backing index created as part of rollover should still exist (but just not part of the data stream) + assertThat(indexExists(DataStream.getDefaultBackingIndexName("ds", 2)), is(true)); + // An additional rollover should create a new backing index (3th generation) and leave .ds-ds-...-2 index as is: + rolloverRequest = new RolloverRequest("ds", null); + rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet(); + assertThat(rolloverResponse.isRolledOver(), is(true)); + assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 3))); + } + public void testSnapshotAndRestoreAll() throws Exception { CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java index a2727f57a9d50..3d0e598722a12 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java @@ -24,12 +24,14 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -44,6 +46,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.SortedMap; public final class TransportFreezeIndexAction extends TransportMasterNodeAction { @@ -158,6 +161,20 @@ public void onFailure(Exception e) { }) { @Override public ClusterState execute(ClusterState currentState) { + List writeIndices = new ArrayList<>(); + SortedMap lookup = currentState.metadata().getIndicesLookup(); + for (Index index : concreteIndices) { + IndexAbstraction ia = lookup.get(index.getName()); + if (ia != null && ia.getParentDataStream() != null && + ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) { + writeIndices.add(index.getName()); + } + } + if (writeIndices.size() > 0) { + throw new IllegalArgumentException("cannot freeze the following data stream write indices [" + + Strings.collectionToCommaDelimitedString(writeIndices) + "]"); + } + final Metadata.Builder builder = Metadata.builder(currentState.metadata()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); for (Index index : concreteIndices) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/20_unsupported_apis.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/20_unsupported_apis.yml index edc92c28c5b06..a3461b5c2c1db 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/20_unsupported_apis.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/20_unsupported_apis.yml @@ -142,39 +142,6 @@ name: simple-data-stream1 - is_true: acknowledged ---- -"Close write index for data stream fails": - - skip: - version: " - 7.8.99" - reason: "data streams only supported in 7.9+" - features: allowed_warnings - - - do: - allowed_warnings: - - "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation" - indices.put_index_template: - name: my-template1 - body: - index_patterns: [simple-data-stream1] - data_stream: {} - - - do: - indices.create_data_stream: - name: simple-data-stream1 - - is_true: acknowledged - - - do: - catch: bad_request - indices.close: - index: ".ds-simple-data-stream1-*000001" - allowed_warnings: - - "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour" - - - do: - indices.delete_data_stream: - name: simple-data-stream1 - - is_true: acknowledged - --- "Prohibit split on data stream's write index": - skip: