Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove extra repo flag to access archive indices #84222

Merged
merged 1 commit into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@

package org.elasticsearch.plugins;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* An extension point for {@link Plugin} implementations to add custom snapshot repositories.
Expand Down Expand Up @@ -66,7 +67,7 @@ default Map<String, Repository.Factory> getInternalRepositories(
*
* returns null if no check is provided
*/
default Consumer<IndexMetadata> addPreRestoreCheck() {
default BiConsumer<Snapshot, Version> addPreRestoreVersionCheck() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

package org.elasticsearch.repositories;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.NamedXContentRegistry;

Expand All @@ -24,7 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* Sets up classes for Snapshot/Restore.
Expand Down Expand Up @@ -83,13 +85,27 @@ public RepositoriesModule(
}
}

List<Consumer<IndexMetadata>> preRestoreChecks = new ArrayList<>();
List<BiConsumer<Snapshot, Version>> preRestoreChecks = new ArrayList<>();
for (RepositoryPlugin repoPlugin : repoPlugins) {
Consumer<IndexMetadata> preRestoreCheck = repoPlugin.addPreRestoreCheck();
BiConsumer<Snapshot, Version> preRestoreCheck = repoPlugin.addPreRestoreVersionCheck();
if (preRestoreCheck != null) {
preRestoreChecks.add(preRestoreCheck);
}
}
if (preRestoreChecks.isEmpty()) {
preRestoreChecks.add((snapshot, version) -> {
if (version.before(Version.CURRENT.minimumIndexCompatibilityVersion())) {
throw new SnapshotRestoreException(
snapshot,
"the snapshot was created with Elasticsearch version ["
+ version
+ "] which is below the current versions minimum index compatibility version ["
+ Version.CURRENT.minimumIndexCompatibilityVersion()
+ "]"
);
}
});
}

Settings settings = env.settings();
Map<String, Repository.Factory> repositoryTypes = Collections.unmodifiableMap(factories);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
Expand Down Expand Up @@ -44,6 +45,7 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -56,7 +58,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -98,7 +100,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private volatile Map<String, Repository> repositories = Collections.emptyMap();
private final RepositoriesStatsArchive repositoriesStatsArchive;

private final List<Consumer<IndexMetadata>> preRestoreChecks;
private final List<BiConsumer<Snapshot, Version>> preRestoreChecks;

public RepositoriesService(
Settings settings,
Expand All @@ -107,7 +109,7 @@ public RepositoriesService(
Map<String, Repository.Factory> typesRegistry,
Map<String, Repository.Factory> internalTypesRegistry,
ThreadPool threadPool,
List<Consumer<IndexMetadata>> preRestoreChecks
List<BiConsumer<Snapshot, Version>> preRestoreChecks
) {
this.typesRegistry = typesRegistry;
this.internalTypesRegistry = internalTypesRegistry;
Expand Down Expand Up @@ -781,7 +783,7 @@ private static RepositoryConflictException newRepositoryConflictException(String
);
}

public List<Consumer<IndexMetadata>> getPreRestoreChecks() {
public List<BiConsumer<Snapshot, Version>> getPreRestoreVersionChecks() {
return preRestoreChecks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private void startRestore(
final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);

// Make sure that we can restore from this snapshot
validateSnapshotRestorable(request, repository.getMetadata(), snapshotInfo);
validateSnapshotRestorable(request, repository.getMetadata(), snapshotInfo, repositoriesService.getPreRestoreVersionChecks());

// Get the global state if necessary
Metadata globalMetadata = null;
Expand Down Expand Up @@ -957,10 +957,16 @@ private static String renameIndex(String index, RestoreSnapshotRequest request,

/**
* Checks that snapshots can be restored and have compatible version
* @param repository repository name
* @param repository repository name
* @param snapshotInfo snapshot metadata
* @param preRestoreVersionChecks
*/
static void validateSnapshotRestorable(RestoreSnapshotRequest request, RepositoryMetadata repository, SnapshotInfo snapshotInfo) {
static void validateSnapshotRestorable(
RestoreSnapshotRequest request,
RepositoryMetadata repository,
SnapshotInfo snapshotInfo,
List<BiConsumer<Snapshot, Version>> preRestoreVersionChecks
) {
if (snapshotInfo.state().restorable() == false) {
throw new SnapshotRestoreException(
new Snapshot(repository.name(), snapshotInfo.snapshotId()),
Expand All @@ -977,17 +983,8 @@ static void validateSnapshotRestorable(RestoreSnapshotRequest request, Repositor
+ "]"
);
}
if (ALLOW_BWC_INDICES_SETTING.get(repository.settings()) == false
&& snapshotInfo.version().before(Version.CURRENT.minimumIndexCompatibilityVersion())) {
throw new SnapshotRestoreException(
new Snapshot(repository.name(), snapshotInfo.snapshotId()),
"the snapshot was created with Elasticsearch version ["
+ snapshotInfo.version()
+ "] which is below the current versions minimum index compatibility version ["
+ Version.CURRENT.minimumIndexCompatibilityVersion()
+ "]"
);
}
Snapshot snapshot = new Snapshot(repository.name(), snapshotInfo.snapshotId());
preRestoreVersionChecks.forEach(c -> c.accept(snapshot, snapshotInfo.version()));
if (request.includeGlobalState() && snapshotInfo.includeGlobalState() == Boolean.FALSE) {
throw new SnapshotRestoreException(
new Snapshot(repository.name(), snapshotInfo.snapshotId()),
Expand All @@ -996,12 +993,6 @@ static void validateSnapshotRestorable(RestoreSnapshotRequest request, Repositor
}
}

public static final Setting<Boolean> ALLOW_BWC_INDICES_SETTING = Setting.boolSetting(
"allow_bwc_indices",
false,
Setting.Property.NodeScope
);

public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
Expand Down Expand Up @@ -1277,7 +1268,8 @@ public ClusterState execute(ClusterState currentState) {
for (Map.Entry<String, IndexId> indexEntry : indicesToRestore.entrySet()) {
final IndexId index = indexEntry.getValue();
final IndexMetadata originalIndexMetadata = metadata.index(index.getName());
repositoriesService.getPreRestoreChecks().forEach(check -> check.accept(originalIndexMetadata));
repositoriesService.getPreRestoreVersionChecks()
.forEach(check -> check.accept(snapshot, originalIndexMetadata.getCreationVersion()));
IndexMetadata snapshotIndexMetadata = updateIndexSettings(
snapshot,
originalIndexMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void testNotAllowToRestoreGlobalStateFromSnapshotWithoutOne() {

var exception = expectThrows(
SnapshotRestoreException.class,
() -> RestoreService.validateSnapshotRestorable(request, repository, snapshotInfo)
() -> RestoreService.validateSnapshotRestorable(request, repository, snapshotInfo, List.of())
);
assertThat(
exception.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -20,7 +21,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
Expand Down Expand Up @@ -84,6 +84,7 @@
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
Expand All @@ -105,7 +106,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -571,12 +571,12 @@ public Map<String, Repository.Factory> getInternalRepositories(
}

@Override
public Consumer<IndexMetadata> addPreRestoreCheck() {
List<Consumer<IndexMetadata>> checks = filterPlugins(RepositoryPlugin.class).stream()
.map(RepositoryPlugin::addPreRestoreCheck)
public BiConsumer<Snapshot, Version> addPreRestoreVersionCheck() {
List<BiConsumer<Snapshot, Version>> checks = filterPlugins(RepositoryPlugin.class).stream()
.map(RepositoryPlugin::addPreRestoreVersionCheck)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return checks.isEmpty() ? null : imd -> checks.forEach(c -> c.accept(imd));
return checks.isEmpty() ? null : (s, v) -> checks.forEach(c -> c.accept(s, v));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -107,7 +107,8 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
.put(original.getSettings())
.put(
IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(),
randomBoolean() ? Version.fromString("5.0.0") : Version.fromString("6.0.0")
metadata.settings()
.getAsVersion("version", randomBoolean() ? Version.fromString("5.0.0") : Version.fromString("6.0.0"))
)
)
.build();
Expand All @@ -121,11 +122,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna

@Before
public void createAndRestoreArchive() throws Exception {
createRepository(
repoName,
TestRepositoryPlugin.FAKE_VERSIONS_TYPE,
randomRepositorySettings().put(RestoreService.ALLOW_BWC_INDICES_SETTING.getKey(), true)
);
createRepository(repoName, TestRepositoryPlugin.FAKE_VERSIONS_TYPE);
createIndex(indexName);
createFullSnapshot(repoName, snapshotName);

Expand Down Expand Up @@ -168,6 +165,25 @@ public void testFailRestoreOnInvalidLicense() throws Exception {
assertThat(e.getMessage(), containsString("current license is non-compliant for [archive]"));
}

public void testFailRestoreOnTooOldVersion() {
createRepository(
repoName,
TestRepositoryPlugin.FAKE_VERSIONS_TYPE,
Settings.builder().put(getRepositoryOnMaster(repoName).getMetadata().settings()).put("version", Version.fromString("2.0.0").id)
);
final RestoreSnapshotRequest req = new RestoreSnapshotRequest(repoName, snapshotName).indices(indexName).waitForCompletion(true);
SnapshotRestoreException e = expectThrows(
SnapshotRestoreException.class,
() -> client().admin().cluster().restoreSnapshot(req).actionGet()
);
assertThat(
e.getMessage(),
containsString(
"the snapshot was created with Elasticsearch version [2.0.0] " + "which isn't supported by the archive functionality"
)
);
}

// checks that shards are failed if license becomes invalid after successful restore
public void testShardAllocationOnInvalidLicense() throws Exception {
final RestoreSnapshotRequest req = new RestoreSnapshotRequest(repoName, snapshotName).indices(indexName).waitForCompletion(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
Expand Down Expand Up @@ -44,6 +43,8 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand All @@ -58,7 +59,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class OldLuceneVersions extends Plugin implements IndexStorePlugin, ClusterPlugin, RepositoryPlugin, ActionPlugin {
Expand All @@ -69,6 +70,8 @@ public class OldLuceneVersions extends Plugin implements IndexStorePlugin, Clust
License.OperationMode.ENTERPRISE
);

private static Version MINIMUM_ARCHIVE_VERSION = Version.fromString("5.0.0");

public static boolean isArchiveIndex(Version version) {
return version.before(Version.CURRENT.minimumIndexCompatibilityVersion());
}
Expand Down Expand Up @@ -133,12 +136,20 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
}

@Override
public Consumer<IndexMetadata> addPreRestoreCheck() {
return indexMetadata -> {
if (isArchiveIndex(indexMetadata.getCreationVersion())) {
public BiConsumer<Snapshot, Version> addPreRestoreVersionCheck() {
return (snapshot, version) -> {
if (isArchiveIndex(version)) {
if (ARCHIVE_FEATURE.checkWithoutTracking(getLicenseState()) == false) {
throw LicenseUtils.newComplianceException("archive");
}
if (version.before(MINIMUM_ARCHIVE_VERSION)) {
throw new SnapshotRestoreException(
snapshot,
"the snapshot was created with Elasticsearch version ["
+ version
+ "] which isn't supported by the archive functionality"
);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void setupIndex() throws IOException {
// register repo on new ES and restore snapshot
Request createRepoRequest2 = new Request("PUT", "/_snapshot/" + repoName);
createRepoRequest2.setJsonEntity("""
{"type":"fs","settings":{"location":"%s","allow_bwc_indices":true}}
{"type":"fs","settings":{"location":"%s"}}
""".formatted(repoLocation));
assertOK(client().performRequest(createRepoRequest2));

Expand Down
Loading