Skip to content

Commit

Permalink
Implement basic CcrRepository restore (#36287) (#36551)
Browse files Browse the repository at this point in the history
This is related to #35975. It implements a basic restore functionality
for the CcrRepository. When the restore process is kicked off, it
configures the new index as expected for a follower index. This means
that the index has a different uuid, the version is not incremented, and
the Ccr metadata is installed.

When the restore shard method is called, an empty shard is initialized.
  • Loading branch information
Tim-Brooks authored Dec 12, 2018
1 parent ff7c59d commit f438834
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ public SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState s
this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null);
}

public SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState state, Version version) {
this(snapshotId, indices, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null);
}

public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime, Boolean includeGlobalState) {
this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, 0, 0,
Collections.emptyList(), includeGlobalState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.ElectMasterService;
Expand Down Expand Up @@ -1454,6 +1455,10 @@ public synchronized <T> Iterable<T> getDataNodeInstances(Class<T> clazz) {
return getInstances(clazz, new DataNodePredicate());
}

public synchronized <T> T getCurrentMasterNodeInstance(Class<T> clazz) {
return getInstance(clazz, new NodeNamePredicate(getMasterName()));
}

/**
* Returns an Iterable to all instances for the given class &gt;T&lt; across all data and master nodes
* in the cluster.
Expand Down Expand Up @@ -2208,25 +2213,26 @@ public NamedWriteableRegistry getNamedWriteableRegistry() {
/**
* Returns a predicate that only accepts settings of nodes with one of the given names.
*/
public static Predicate<Settings> nameFilter(String... nodeName) {
return new NodeNamePredicate(new HashSet<>(Arrays.asList(nodeName)));
public static Predicate<Settings> nameFilter(String... nodeNames) {
final Set<String> nodes = Sets.newHashSet(nodeNames);
return settings -> nodes.contains(settings.get("node.name"));
}

private static final class NodeNamePredicate implements Predicate<Settings> {
private static final class NodeNamePredicate implements Predicate<NodeAndClient> {
private final HashSet<String> nodeNames;

NodeNamePredicate(HashSet<String> nodeNames) {
this.nodeNames = nodeNames;
NodeNamePredicate(String... nodeNames) {
this.nodeNames = Sets.newHashSet(nodeNames);
}

@Override
public boolean test(Settings settings) {
return nodeNames.contains(settings.get("node.name"));

public boolean test(NodeAndClient nodeAndClient) {
return nodeNames.contains(nodeAndClient.getName());
}
}



/**
* An abstract class that is called during {@link #rollingRestart(InternalTestCluster.RestartCallback)}
* and / or {@link #fullRestart(InternalTestCluster.RestartCallback)} to execute actions at certain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
private Client client;

private final boolean tribeNode;
private final boolean tribeNodeClient;
Expand Down Expand Up @@ -153,6 +154,7 @@ public Collection<Object> createComponents(
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
this.client = client;
if (enabled == false) {
return emptyList();
}
Expand Down Expand Up @@ -282,7 +284,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {

@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, settings);
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings);
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand All @@ -25,24 +33,41 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
* restore shards/indexes that exist on the remote cluster.
*/
public class CcrRepository extends AbstractLifecycleComponent implements Repository {

public static final String LATEST = "_latest_";
public static final String TYPE = "_ccr_";
public static final String NAME_PREFIX = "_ccr_";
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);

private final RepositoryMetaData metadata;
private final String remoteClusterAlias;
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;

public CcrRepository(RepositoryMetaData metadata, Settings settings) {
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
super(settings);
this.metadata = metadata;
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
this.client = client;
}

@Override
Expand All @@ -67,22 +92,85 @@ public RepositoryMetaData getMetadata() {

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true).get();
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
ArrayList<String> indices = new ArrayList<>(indicesMap.size());
indicesMap.keysIt().forEachRemaining(indices::add);

return new SnapshotInfo(snapshotId, indices, SnapshotState.SUCCESS, response.getState().getNodes().getMaxNodeVersion());
}

@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data
.get();
return response.getState().metaData();
}

@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
String leaderIndex = index.getName();
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices(leaderIndex)
.get();

// Validates whether the leader cluster has been configured properly:
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex);
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();

IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData);
// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", leaderHistoryUUIDs));
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, remoteClusterAlias);
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);

return imdBuilder.build();
}

@Override
public RepositoryData getRepositoryData() {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).get();
MetaData remoteMetaData = response.getState().getMetaData();

Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>();
Map<String, SnapshotState> snapshotStates = new HashMap<>(copiedSnapshotIds.size());
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>(copiedSnapshotIds.size());

ImmutableOpenMap<String, IndexMetaData> remoteIndices = remoteMetaData.getIndices();
for (String indexName : remoteMetaData.getConcreteAllIndices()) {
// Both the Snapshot name and UUID are set to _latest_
SnapshotId snapshotId = new SnapshotId(LATEST, LATEST);
copiedSnapshotIds.put(indexName, snapshotId);
snapshotStates.put(indexName, SnapshotState.SUCCESS);
Index index = remoteIndices.get(indexName).getIndex();
indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId));
}

return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, Collections.emptyList());
}

@Override
Expand Down Expand Up @@ -137,9 +225,17 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId,
RecoveryState recoveryState) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
final Store store = indexShard.store();
store.incRef();
try {
store.createEmpty();
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e);
} finally {
store.decRef();
}
}

@Override
Expand Down
Loading

0 comments on commit f438834

Please sign in to comment.