Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Concurrent Snapshot Ending And Stabilize Snapshot Finalization #38368

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -87,9 +88,11 @@ public static class Entry {
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -104,15 +107,26 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.waitingIndices = findWaitingIndices(shards);
}
this.repositoryStateId = repositoryStateId;
this.failure = failure;
}

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards);
entry.repositoryStateId, shards, entry.failure);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards);
this(entry, entry.state, shards, entry.failure);
}

public Snapshot snapshot() {
Expand Down Expand Up @@ -151,6 +165,10 @@ public long getRepositoryStateId() {
return repositoryStateId;
}

public String failure() {
return failure;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -427,14 +445,21 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
}
}
long repositoryStateId = in.readLong();
final String failure;
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
failure = in.readOptionalString();
} else {
failure = null;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
state,
Collections.unmodifiableList(indexBuilder),
startTime,
repositoryStateId,
builder.build());
builder.build(),
failure);
}
this.entries = Arrays.asList(entries);
}
Expand Down Expand Up @@ -463,6 +488,9 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
out.writeLong(entry.repositoryStateId);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(entry.failure);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,6 @@ private class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIn
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);
}
} else {
entries.add(entry);
Expand Down
Loading