Skip to content

Commit

Permalink
MINOR: Cleanup Runnables in SnapshotsService (#35796) (#35799)
Browse files Browse the repository at this point in the history
* Simplify complex `Runnable` by moving to `AbstractRunnable`
  • Loading branch information
original-brownbear authored Nov 22, 2018
1 parent 1170050 commit f95edcb
Showing 1 changed file with 123 additions and 111 deletions.
234 changes: 123 additions & 111 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
Expand Down Expand Up @@ -280,9 +281,7 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
if (newSnapshot != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() ->
beginSnapshot(newState, newSnapshot, request.partial(), listener)
);
beginSnapshot(newState, newSnapshot, request.partial(), listener);
}
}

Expand Down Expand Up @@ -349,125 +348,133 @@ private void beginSnapshot(final ClusterState clusterState,
final SnapshotsInProgress.Entry snapshot,
final boolean partial,
final CreateSnapshotListener userCreateSnapshotListener) {
boolean snapshotCreated = false;
try {
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());

MetaData metaData = clusterState.metaData();
if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {

repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
snapshotCreated = true;
boolean snapshotCreated;

logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse();
endSnapshot(snapshot);
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
@Override
protected void doRun() {
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());

MetaData metaData = clusterState.metaData();
if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
}

SnapshotsInProgress.Entry endSnapshot;
String failure = null;
repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
snapshotCreated = true;

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
entries.add(entry);
continue;
}
logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse();
endSnapshot(snapshot);
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {

SnapshotsInProgress.Entry endSnapshot;
String failure;

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
entries.add(entry);
continue;
}

if (entry.state() != State.ABORTED) {
// Replace the snapshot that was just intialized
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Set<String> missing = indicesWithMissingShards.v1();
Set<String> closed = indicesWithMissingShards.v2();
if (missing.isEmpty() == false || closed.isEmpty() == false) {
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(endSnapshot);

final StringBuilder failureMessage = new StringBuilder();
if (missing.isEmpty() == false) {
failureMessage.append("Indices don't have primary shards ");
failureMessage.append(missing);
}
if (closed.isEmpty() == false) {
if (failureMessage.length() > 0) {
failureMessage.append("; ");
if (entry.state() != State.ABORTED) {
// Replace the snapshot that was just intialized
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Set<String> missing = indicesWithMissingShards.v1();
Set<String> closed = indicesWithMissingShards.v2();
if (missing.isEmpty() == false || closed.isEmpty() == false) {
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(endSnapshot);

final StringBuilder failureMessage = new StringBuilder();
if (missing.isEmpty() == false) {
failureMessage.append("Indices don't have primary shards ");
failureMessage.append(missing);
}
if (closed.isEmpty() == false) {
if (failureMessage.length() > 0) {
failureMessage.append("; ");
}
failureMessage.append("Indices are closed ");
failureMessage.append(closed);
}
failureMessage.append("Indices are closed ");
failureMessage.append(closed);
failure = failureMessage.toString();
continue;
}
failure = failureMessage.toString();
continue;
}
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot);
if (completed(shards.values())) {
endSnapshot = updatedSnapshot;
}
} else {
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
failure = "snapshot was aborted during initialization";
endSnapshot = entry;
entries.add(endSnapshot);
}
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot);
if (completed(shards.values())) {
endSnapshot = updatedSnapshot;
}
} else {
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
failure = "snapshot was aborted during initialization";
endSnapshot = entry;
entries.add(endSnapshot);
}
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries)))
.build();
}
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries)))
.build();
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
}

@Override
public void onNoLongerMaster(String source) {
// We are not longer a master - we shouldn't try to do any cleanup
// The new master will take care of it
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
userCreateSnapshotListener.onFailure(
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
}
@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
// completion listener in this method. For the snapshot completion to work properly, the snapshot
// should still exist when listener is registered.
userCreateSnapshotListener.onResponse();
@Override
public void onNoLongerMaster(String source) {
// We are not longer a master - we shouldn't try to do any cleanup
// The new master will take care of it
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
userCreateSnapshotListener.onFailure(
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
}

// Now that snapshot completion listener is registered we can end the snapshot if needed
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
// go ahead and continue working on this snapshot rather then end here.
if (endSnapshot != null) {
endSnapshot(endSnapshot, failure);
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
// completion listener in this method. For the snapshot completion to work properly, the snapshot
// should still exist when listener is registered.
userCreateSnapshotListener.onResponse();

// Now that snapshot completion listener is registered we can end the snapshot if needed
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
// go ahead and continue working on this snapshot rather then end here.
if (endSnapshot != null) {
endSnapshot(endSnapshot, failure);
}
}
}
});
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
}
});
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
}
});
}

private class CleanupAfterErrorListener implements ActionListener<SnapshotInfo> {
Expand Down Expand Up @@ -958,9 +965,10 @@ void endSnapshot(final SnapshotsInProgress.Entry entry) {
* @param failure failure reason or null if snapshot was successful
*/
private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
final Snapshot snapshot = entry.snapshot();
try {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@Override
protected void doRun() {
final Snapshot snapshot = entry.snapshot();
final Repository repository = repositoriesService.repository(snapshot.getRepository());
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
Expand All @@ -982,7 +990,11 @@ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String fai
entry.includeGlobalState());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
} catch (Exception e) {
}

@Override
public void onFailure(final Exception e) {
Snapshot snapshot = entry.snapshot();
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeSnapshotFromClusterState(snapshot, null, e);
}
Expand Down

0 comments on commit f95edcb

Please sign in to comment.