Skip to content

Commit

Permalink
Write state also on data nodes if not master eligible
Browse files Browse the repository at this point in the history
When a node was a data node only then the index state was not written.
In case this node connected to a master that did not have the index
in the cluster state, for example because a master was restarted and
the data folder was lost, then the indices were not imported as dangling
but instead deleted.
This commit makes sure that index state for data nodes is also written
if they have at least one shard of this index allocated.

closes elastic#8823
  • Loading branch information
brwe committed Mar 10, 2015
1 parent d9c19cd commit b583799
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 10 deletions.
16 changes: 16 additions & 0 deletions src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.ESLoggerFactory;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -105,6 +106,9 @@ public List<String> indicesCreated() {
* Returns the indices deleted in this event
*/
public List<String> indicesDeleted() {
if (newMaster()) {
return ImmutableList.of();
}
if (previousState == null) {
return ImmutableList.of();
}
Expand Down Expand Up @@ -165,4 +169,16 @@ public boolean nodesAdded() {
public boolean nodesChanged() {
return nodesRemoved() || nodesAdded();
}

public boolean newMaster() {
String oldMaster = previousState().getNodes().masterNodeId();
String newMaster = state().getNodes().masterNodeId();
if (oldMaster == null && newMaster == null) {
return false;
}
if (oldMaster == null && newMaster != null) {
return true;
}
return previousState().getNodes().masterNodeId().equals(state().getNodes().masterNodeId()) == false;
}
}
71 changes: 65 additions & 6 deletions src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
Expand All @@ -28,16 +31,16 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand Down Expand Up @@ -72,7 +75,7 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
if (DiscoveryNode.masterNode(settings)) {
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
pre20Upgrade();
Expand Down Expand Up @@ -103,8 +106,8 @@ public void clusterChanged(ClusterChangedEvent event) {
// we don't check if metaData changed, since we might be called several times and we need to check dangling...

boolean success = true;
// only applied to master node, writing the global and index level states
if (state.nodes().localNode().masterNode()) {
// write the state if this node is a master eligible node or if it is a data node and has shards allocated on it
if (state.nodes().localNode().masterNode() || state.nodes().localNode().dataNode()) {
// check if the global state changed?
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
try {
Expand All @@ -116,6 +119,22 @@ public void clusterChanged(ClusterChangedEvent event) {

// check and write changes in indices
for (IndexMetaData indexMetaData : newMetaData) {

boolean shardsAllocatedOnThisNodeInLastClusterState = true;
if (isDataOnlyNode(state)) {
boolean shardsCurrentlyAllocatedOnThisNode = shardsAllocatedOnLocalNode(state, indexMetaData);
shardsAllocatedOnThisNodeInLastClusterState = shardsAllocatedOnLocalNode(event.previousState(), indexMetaData);

if (shardsCurrentlyAllocatedOnThisNode == false) {
// remove the index state for this index if it is only a data node
// only delete if the last shard was removed
if (shardsAllocatedOnThisNodeInLastClusterState) {
removeIndexState(indexMetaData);
}
// nothing left to do, we do not write the index state for data only nodes if they do not have shards allocated on them
continue;
}
}
String writeReason = null;
IndexMetaData currentIndexMetaData;
if (currentMetaData == null) {
Expand All @@ -132,6 +151,9 @@ public void clusterChanged(ClusterChangedEvent event) {
writeReason = "freshly created";
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
} else if (shardsAllocatedOnThisNodeInLastClusterState == false && isDataOnlyNode(state)) {
// shard was newly allocated because it was not allocated in last cluster state but is now
writeReason = "shard allocated on data only node";
}

// we update the writeReason only if we really need to write it
Expand All @@ -154,6 +176,43 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

protected boolean shardsAllocatedOnLocalNode(ClusterState state, IndexMetaData indexMetaData) {
boolean shardsAllocatedOnThisNode = false;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexMetaData.index());
if (indexRoutingTable == null) {
// nothing allocated ?
return false;
}
// iterate over shards and see if one is on our node
for (IntObjectCursor it : indexRoutingTable.shards()) {
IndexShardRoutingTable shardRoutingTable = (IndexShardRoutingTable) it.value;
for (ShardRouting shardRouting : shardRoutingTable.shards()) {
if (shardRouting.currentNodeId() != null && shardRouting.currentNodeId().equals(state.nodes().localNode().getId())) {
shardsAllocatedOnThisNode = true;
}
}
}
return shardsAllocatedOnThisNode;
}

protected boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().localNode().masterNode() == false) && (state.nodes().localNode().dataNode() == true));
}

private void removeIndexState(IndexMetaData indexMetaData) {
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, true);
try {
Path[] locations = nodeEnv.indexPaths(new Index(indexMetaData.index()));
Preconditions.checkArgument(locations != null, "Locations must not be null");
Preconditions.checkArgument(locations.length > 0, "One or more locations required");
writer.cleanupOldFiles(INDEX_STATE_FILE_PREFIX, null, locations);
logger.debug("successfully deleted state for {}", indexMetaData.getIndex());
} catch (Throwable ex) {
logger.warn("[{}]: failed to delete index state", ex, indexMetaData.index());
// and now what?
}
}

/**
* Throws an IAE if a pre 0.19 state is detected
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ protected Directory newDirectory(Path dir) throws IOException {
return new SimpleFSDirectory(dir);
}

private void cleanupOldFiles(final String prefix, final String currentStateFile, Path[] locations) throws IOException {
protected void cleanupOldFiles(final String prefix, final String currentStateFile, Path[] locations) throws IOException {
final DirectoryStream.Filter<Path> filter = new DirectoryStream.Filter<Path>() {
@Override
public boolean accept(Path entry) throws IOException {
final String entryFileName = entry.getFileName().toString();
return Files.isRegularFile(entry)
&& entryFileName.startsWith(prefix) // only state files
&& currentStateFile.equals(entryFileName) == false; // keep the current state file around
&& entryFileName.equals(currentStateFile) == false; // keep the current state file around
}
};
// now clean up the old files
Expand Down
Loading

0 comments on commit b583799

Please sign in to comment.