Skip to content

Commit

Permalink
[Backport 2.x] [Snapshot Interop] Add Changes in Create Snapshot Flow…
Browse files Browse the repository at this point in the history
… for remote store interoperability. (#8071)

Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
harishbhakuni authored Jun 15, 2023
1 parent d868857 commit b642e69
Show file tree
Hide file tree
Showing 26 changed files with 1,541 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new query profile collector fields with concurrent search execution ([#7898](https://github.com/opensearch-project/OpenSearch/pull/7898))
- Align range and default value for deletes_pct_allowed in merge policy ([#7730](https://github.com/opensearch-project/OpenSearch/pull/7730))
- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#8071](https://github.com/opensearch-project/OpenSearch/pull/8071))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) {
ImmutableOpenMap.of(),
null,
SnapshotInfoTests.randomUserMetadata(),
randomVersion(random())
randomVersion(random()),
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public static Entry startedEntry(
long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
return new SnapshotsInProgress.Entry(
snapshot,
Expand All @@ -137,7 +138,8 @@ public static Entry startedEntry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -174,7 +176,8 @@ public static Entry startClone(
Collections.emptyMap(),
version,
source,
ImmutableOpenMap.of()
ImmutableOpenMap.of(),
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
);
}

Expand All @@ -187,6 +190,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean remoteStoreIndexShallowCopy;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
Expand Down Expand Up @@ -229,7 +233,8 @@ public Entry(
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -245,7 +250,8 @@ public Entry(
userMetadata,
version,
null,
ImmutableOpenMap.of()
ImmutableOpenMap.of(),
remoteStoreIndexShallowCopy
);
}

Expand All @@ -263,7 +269,8 @@ private Entry(
Map<String, Object> userMetadata,
Version version,
@Nullable SnapshotId source,
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones,
boolean remoteStoreIndexShallowCopy
) {
this.state = state;
this.snapshot = snapshot;
Expand All @@ -284,6 +291,7 @@ private Entry(
} else {
this.clones = clones;
}
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

Expand Down Expand Up @@ -324,6 +332,11 @@ private Entry(StreamInput in) throws IOException {
source = null;
clones = ImmutableOpenMap.of();
}
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
remoteStoreIndexShallowCopy = in.readBoolean();
} else {
remoteStoreIndexShallowCopy = false;
}
}

private static boolean assertShardsConsistent(
Expand Down Expand Up @@ -378,7 +391,8 @@ public Entry(
long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -392,7 +406,8 @@ public Entry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -417,7 +432,8 @@ public Entry(
shards,
failure,
entry.userMetadata,
version
version,
entry.remoteStoreIndexShallowCopy
);
}

Expand All @@ -441,7 +457,8 @@ public Entry withRepoGen(long newRepoGen) {
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -463,7 +480,8 @@ public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus>
userMetadata,
version,
source,
updatedClones
updatedClones,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -518,7 +536,8 @@ public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State s
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -544,7 +563,8 @@ public Entry withShardStates(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shar
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}
return withStartedShards(shards);
Expand All @@ -567,7 +587,8 @@ public Entry withStartedShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> sh
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
assert updated.state().completed() == false && completed(updated.shards().values()) == false
: "Only running snapshots allowed but saw [" + updated + "]";
Expand Down Expand Up @@ -599,6 +620,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public Map<String, Object> userMetadata() {
return userMetadata;
}
Expand Down Expand Up @@ -662,7 +687,7 @@ public boolean equals(Object o) {
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;

if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false;
return true;
}

Expand All @@ -679,6 +704,7 @@ public int hashCode() {
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -752,6 +778,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(source);
out.writeMap(clones);
}
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeBoolean(remoteStoreIndexShallowCopy);
}
}

@Override
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,46 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException {
GatedCloseable<IndexCommit> indexCommit = acquireLastIndexCommit(flushFirst);
getEngine().refresh("Snapshot for Remote Store based Shard");
return indexCommit;
}

/**
*
* @param snapshotId Snapshot UUID.
* @param primaryTerm current primary term.
* @param generation Snapshot Commit Generation.
* @throws IOException if there is some failure in acquiring lock in remote store.
*/
public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId);
}

/**
*
* @param snapshotId Snapshot UUID.
* @param primaryTerm current primary term.
* @param generation Snapshot Commit Generation.
* @throws IOException if there is some failure in releasing lock in remote store.
*/
public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId);
}

private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() {
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
Expand Down
Loading

0 comments on commit b642e69

Please sign in to comment.