Skip to content

Commit

Permalink
Write state also on data nodes if not master eligible
Browse files Browse the repository at this point in the history
When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes #8823
closes #9952
  • Loading branch information
brwe committed Apr 29, 2015
1 parent 3a2725b commit c3a1729
Show file tree
Hide file tree
Showing 3 changed files with 749 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.gateway.local.state.meta;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -30,6 +32,8 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -48,11 +52,8 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.regex.Pattern;

/**
*
Expand All @@ -62,6 +63,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
static final String GLOBAL_STATE_FILE_PREFIX = "global-";
private static final String INDEX_STATE_FILE_PREFIX = "state-";
private static final String GLOBAL_STATE_LOG_TYPE = "[_global]";

static enum AutoImportDangledState {
NO() {
@Override
Expand Down Expand Up @@ -103,7 +105,9 @@ public static AutoImportDangledState fromString(String value) {
private final LocalAllocateDangledIndices allocateDangledIndices;

@Nullable
private volatile MetaData currentMetaData;
private volatile MetaData previousMetaData;

private volatile ImmutableSet<String> previouslyWrittenIndices = ImmutableSet.of();

private final XContentType format;
private final ToXContent.Params formatParams;
Expand Down Expand Up @@ -157,7 +161,7 @@ public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvir
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
if (DiscoveryNode.masterNode(settings)) {
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try {
pre019Upgrade();
long start = System.currentTimeMillis();
Expand All @@ -178,51 +182,58 @@ public MetaData loadMetaState() throws Exception {

@Override
public void clusterChanged(ClusterChangedEvent event) {

Set<String> relevantIndices = new HashSet<>();
final ClusterState state = event.state();
if (state.blocks().disableStatePersistence()) {
// reset the current metadata, we need to start fresh...
this.currentMetaData = null;
this.previousMetaData = null;
previouslyWrittenIndices = ImmutableSet.of();
return;
}

MetaData newMetaData = state.metaData();
// we don't check if metaData changed, since we might be called several times and we need to check dangling...

boolean success = true;
// only applied to master node, writing the global and index level states
if (state.nodes().localNode().masterNode()) {
// write the state if this node is a master eligible node or if it is a data node and has shards allocated on it
if (state.nodes().localNode().masterNode() || state.nodes().localNode().dataNode()) {
// check if the global state changed?
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
if (previousMetaData == null || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) {
try {
writeGlobalState("changed", newMetaData);
// we determine if or if not we write meta data on data only nodes by looking at the shard routing
// and only write if a shard of this index is allocated on this node
// however, closed indices do not appear in the shard routing. if the meta data for a closed index is
// updated it will therefore not be written in case the list of previouslyWrittenIndices is empty (because state
// persistence was disabled or the node was restarted), see getRelevantIndicesOnDataOnlyNode().
// we therefore have to check here if we have shards on disk and add their indices to the previouslyWrittenIndices list
if (isDataOnlyNode(state)) {
ImmutableSet.Builder<String> previouslyWrittenIndicesBuilder = ImmutableSet.builder();
for (IndexMetaData indexMetaData : newMetaData) {
IndexMetaData indexMetaDataOnDisk = null;
if (indexMetaData.state().equals(IndexMetaData.State.CLOSE)) {
indexMetaDataOnDisk = loadIndexState(indexMetaData.index());

}
if (indexMetaDataOnDisk != null) {
previouslyWrittenIndicesBuilder.add(indexMetaDataOnDisk.index());
}
}
previouslyWrittenIndices = previouslyWrittenIndicesBuilder.addAll(previouslyWrittenIndices).build();
}
} catch (Throwable e) {
success = false;
}
}

// check and write changes in indices
for (IndexMetaData indexMetaData : newMetaData) {
String writeReason = null;
IndexMetaData currentIndexMetaData;
if (currentMetaData == null) {
// a new event..., check from the state stored
currentIndexMetaData = loadIndexState(indexMetaData.index());
} else {
currentIndexMetaData = currentMetaData.index(indexMetaData.index());
}
if (currentIndexMetaData == null) {
writeReason = "freshly created";
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
}

// we update the writeReason only if we really need to write it
if (writeReason == null) {
continue;
}

Iterable<IndexMetaWriteInfo> writeInfo;
relevantIndices = getRelevantIndices(event.state(), previouslyWrittenIndices);
writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
try {
writeIndex(writeReason, indexMetaData, currentIndexMetaData);
writeIndex(indexMetaWrite.reason, indexMetaWrite.newMetaData, indexMetaWrite.previousMetaData);
} catch (Throwable e) {
success = false;
}
Expand Down Expand Up @@ -253,7 +264,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
final IndexMetaData indexMetaData = loadIndexState(indexName);
if (indexMetaData != null) {
if(autoImportDangled.shouldImport()){
if (autoImportDangled.shouldImport()) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state [{}]", indexName, autoImportDangled);
danglingIndices.put(indexName, new DanglingIndex(indexName, null));
} else if (danglingTimeout.millis() == 0) {
Expand Down Expand Up @@ -314,8 +325,104 @@ public void onFailure(Throwable e) {
}

if (success) {
currentMetaData = newMetaData;
previousMetaData = newMetaData;
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
previouslyWrittenIndices = builder.addAll(relevantIndices).build();
}
}

public static Set<String> getRelevantIndices(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
Set<String> relevantIndices;
if (isDataOnlyNode(state)) {
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previouslyWrittenIndices);
} else if (state.nodes().localNode().masterNode() == true) {
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
} else {
relevantIndices = Collections.emptySet();
}
return relevantIndices;
}

/**
* Loads the current meta state for each index in the new cluster state and checks if it has to be persisted.
* Each index state that should be written to disk will be returned. This is only run for data only nodes.
* It will return only the states for indices that actually have a shard allocated on the current node.
*
* @param previouslyWrittenIndices A list of indices for which the state was already written before
* @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written
* @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now
* @param newMetaData The new metadata
* @return iterable over all indices states that should be written to disk
*/
public static Iterable<LocalGatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(ImmutableSet<String> previouslyWrittenIndices, Set<String> potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) {
List<LocalGatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new ArrayList<>();
for (String index : potentiallyUnwrittenIndices) {
IndexMetaData newIndexMetaData = newMetaData.index(index);
IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index);
String writeReason = null;
if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) {
writeReason = "freshly created";
} else if (previousIndexMetaData.version() != newIndexMetaData.version()) {
writeReason = "version changed from [" + previousIndexMetaData.version() + "] to [" + newIndexMetaData.version() + "]";
}
if (writeReason != null) {
indicesToWrite.add(new LocalGatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason));
}
}
return indicesToWrite;
}

public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
}
Set<String> indices = new HashSet<>();
for (MutableShardRouting routing : newRoutingNode) {
indices.add(routing.index());
}
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
for (IndexMetaData indexMetaData : state.metaData()) {
if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && state.metaData().getIndices().get(indexMetaData.getIndex()).state().equals(IndexMetaData.State.CLOSE)) {
indices.add(indexMetaData.getIndex());
}
}
return indices;
}

public static Set<String> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<String> relevantIndices;
relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices
for (IndexMetaData indexMetaData : state.metaData()) {
relevantIndices.add(indexMetaData.getIndex());
}
return relevantIndices;
}


public static class IndexMetaWriteInfo {
final IndexMetaData newMetaData;
final String reason;
final IndexMetaData previousMetaData;

public IndexMetaWriteInfo(IndexMetaData newMetaData, IndexMetaData previousMetaData, String reason) {
this.newMetaData = newMetaData;
this.reason = reason;
this.previousMetaData = previousMetaData;
}

public IndexMetaData getNewMetaData() {
return newMetaData;
}

public String getReason() {
return reason;
}
}

protected static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().localNode().masterNode() == false) && state.nodes().localNode().dataNode());
}

/**
Expand Down Expand Up @@ -344,7 +451,8 @@ static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format,

@Override
public void toXContent(XContentBuilder builder, IndexMetaData state) throws IOException {
IndexMetaData.Builder.toXContent(state, builder, formatParams); }
IndexMetaData.Builder.toXContent(state, builder, formatParams);
}

@Override
public IndexMetaData fromXContent(XContentParser parser) throws IOException {
Expand Down
Loading

0 comments on commit c3a1729

Please sign in to comment.