From f7042019de42342e58b318b7f4b08484b9328c92 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Tue, 6 Jun 2023 19:13:47 -0500 Subject: [PATCH] Fix changelog entries (#7946) Move entry for #7321 to correct section and fix the malformed entry from #7452. Signed-off-by: Andrew Ross --- CHANGELOG.md | 10 +- .../cluster/ClusterStateDiffIT.java | 3 +- .../create/CreateSnapshotRequest.java | 28 +- .../cluster/SnapshotsInProgress.java | 59 +++- .../opensearch/index/shard/IndexShard.java | 6 + .../RemoteStoreShardShallowCopySnapshot.java | 280 ++++++++++++++++++ .../repositories/FilterRepository.java | 39 +++ .../opensearch/repositories/Repository.java | 75 ++++- .../blobstore/BlobStoreRepository.java | 111 +++++++ .../opensearch/snapshots/SnapshotInfo.java | 66 ++++- .../snapshots/SnapshotShardsService.java | 111 +++++-- .../snapshots/SnapshotsService.java | 15 +- .../create/CreateSnapshotResponseTests.java | 3 +- .../get/GetSnapshotsResponseTests.java | 3 +- .../DeleteDataStreamRequestTests.java | 2 +- .../MetadataDeleteIndexServiceTests.java | 3 +- .../MetadataIndexStateServiceTests.java | 3 +- .../index/shard/IndexShardTests.java | 10 + .../RepositoriesServiceTests.java | 27 ++ .../BlobStoreRepositoryRestoreTests.java | 3 +- .../snapshots/SnapshotInfoTests.java | 30 +- ...SnapshotsInProgressSerializationTests.java | 3 +- .../snapshots/SnapshotsServiceTests.java | 3 +- ...ckEventuallyConsistentRepositoryTests.java | 9 +- .../index/shard/RestoreOnlyRepository.java | 15 + .../AbstractSnapshotIntegTestCase.java | 3 +- 26 files changed, 824 insertions(+), 96 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ceb36bd41ba3..e5d3ec26e5498 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,7 +60,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ZSTD compression for snapshotting ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996)) - Change `com.amazonaws.sdk.ec2MetadataServiceEndpointOverride` to `aws.ec2MetadataServiceEndpoint` ([7372](https://github.com/opensearch-project/OpenSearch/pull/7372/)) - Change `com.amazonaws.sdk.stsEndpointOverride` to `aws.stsEndpointOverride` ([7372](https://github.com/opensearch-project/OpenSearch/pull/7372/)) -- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) ### Deprecated @@ -90,7 +89,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security ## [Unreleased 2.x] -### Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452)) +### Added +- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452)) ### Dependencies - Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897)) @@ -99,13 +99,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836)) - Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673)) +- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) +- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118)) ### Deprecated -### Removed +### Removedg ### Fixed -- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924 +- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924 ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java index ead6bc40a953e..6009ad8382e54 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java @@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) { Map.of(), null, SnapshotInfoTests.randomUserMetadata(), - randomVersion(random()) + randomVersion(random()), + false ) ) ); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java index 0226b20b5bf34..5a237241797b9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotRequest.java @@ -34,6 +34,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchGenerationException; +import org.opensearch.Version; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.IndicesRequest; import org.opensearch.action.support.IndicesOptions; @@ -101,6 +102,10 @@ public class CreateSnapshotRequest extends ClusterManagerNodeRequest userMetadata; + private Boolean remoteStoreIndexShallowCopy; + + private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy"; + public CreateSnapshotRequest() {} /** @@ -125,6 +130,9 @@ public CreateSnapshotRequest(StreamInput in) throws IOException { waitForCompletion = in.readBoolean(); partial = in.readBoolean(); userMetadata = in.readMap(); + if (in.getVersion().onOrAfter(Version.V_2_8_0)) { + remoteStoreIndexShallowCopy = in.readOptionalBoolean(); + } } @Override @@ -139,6 +147,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(waitForCompletion); out.writeBoolean(partial); out.writeMap(userMetadata); + if (out.getVersion().onOrAfter(Version.V_2_8_0)) { + out.writeOptionalBoolean(remoteStoreIndexShallowCopy); + } } @Override @@ -328,6 +339,11 @@ public CreateSnapshotRequest waitForCompletion(boolean waitForCompletion) { return this; } + public CreateSnapshotRequest remoteStoreIndexShallowCopy(boolean remoteStoreIndexShallowCopy) { + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; + return this; + } + /** * Returns true if the request should wait for the snapshot completion before returning * @@ -429,6 +445,10 @@ public Map userMetadata() { return userMetadata; } + public Boolean remoteStoreIndexShallowCopy() { + return remoteStoreIndexShallowCopy; + } + public CreateSnapshotRequest userMetadata(Map userMetadata) { this.userMetadata = userMetadata; return this; @@ -466,6 +486,8 @@ public CreateSnapshotRequest source(Map source) { throw new IllegalArgumentException("malformed metadata, should be an object"); } userMetadata((Map) entry.getValue()); + } else if (name.equals(REMOTE_STORE_INDEX_SHALLOW_COPY)) { + remoteStoreIndexShallowCopy = nodeBooleanValue(entry.getValue(), REMOTE_STORE_INDEX_SHALLOW_COPY); } } indicesOptions(IndicesOptions.fromMap(source, indicesOptions)); @@ -495,6 +517,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws indicesOptions.toXContent(builder, params); } builder.field("metadata", userMetadata); + builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); builder.endObject(); return builder; } @@ -518,7 +541,8 @@ public boolean equals(Object o) { && Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(settings, that.settings) && Objects.equals(clusterManagerNodeTimeout, that.clusterManagerNodeTimeout) - && Objects.equals(userMetadata, that.userMetadata); + && Objects.equals(userMetadata, that.userMetadata) + && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy); } @Override @@ -562,6 +586,8 @@ public String toString() { + clusterManagerNodeTimeout + ", metadata=" + userMetadata + + ", remoteStoreIndexShallowCopy=" + + remoteStoreIndexShallowCopy + '}'; } } diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 494933f613910..08435557e2281 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -113,7 +113,8 @@ public static Entry startedEntry( long repositoryStateId, final Map shards, Map userMetadata, - Version version + Version version, + boolean remoteStoreIndexShallowCopy ) { return new SnapshotsInProgress.Entry( snapshot, @@ -127,7 +128,8 @@ public static Entry startedEntry( shards, null, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); } @@ -164,7 +166,8 @@ public static Entry startClone( Collections.emptyMap(), version, source, - Map.of() + Map.of(), + false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create. ); } @@ -177,6 +180,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; + private final boolean remoteStoreIndexShallowCopy; private final boolean partial; /** * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation. @@ -219,7 +223,8 @@ public Entry( final Map shards, String failure, Map userMetadata, - Version version + Version version, + boolean remoteStoreIndexShallowCopy ) { this( snapshot, @@ -235,7 +240,8 @@ public Entry( userMetadata, version, null, - Map.of() + Map.of(), + remoteStoreIndexShallowCopy ); } @@ -253,7 +259,8 @@ private Entry( final Map userMetadata, Version version, @Nullable SnapshotId source, - @Nullable final Map clones + @Nullable final Map clones, + boolean remoteStoreIndexShallowCopy ) { this.state = state; this.snapshot = snapshot; @@ -274,6 +281,7 @@ private Entry( } else { this.clones = Collections.unmodifiableMap(clones); } + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } @@ -292,6 +300,11 @@ private Entry(StreamInput in) throws IOException { dataStreams = in.readStringList(); source = in.readOptionalWriteable(SnapshotId::new); clones = in.readMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); + if (in.getVersion().onOrAfter(Version.V_2_8_0)) { + remoteStoreIndexShallowCopy = in.readBoolean(); + } else { + remoteStoreIndexShallowCopy = false; + } } private static boolean assertShardsConsistent( @@ -346,7 +359,8 @@ public Entry( long repositoryStateId, final Map shards, Map userMetadata, - Version version + Version version, + boolean remoteStoreIndexShallowCopy ) { this( snapshot, @@ -360,7 +374,8 @@ public Entry( shards, null, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); } @@ -385,7 +400,8 @@ public Entry( shards, failure, entry.userMetadata, - version + version, + entry.remoteStoreIndexShallowCopy ); } @@ -409,7 +425,8 @@ public Entry withRepoGen(long newRepoGen) { userMetadata, version, source, - clones + clones, + remoteStoreIndexShallowCopy ); } @@ -431,7 +448,8 @@ public Entry withClones(final Map update userMetadata, version, source, - updatedClones + updatedClones, + remoteStoreIndexShallowCopy ); } @@ -486,7 +504,8 @@ public Entry fail(final Map shards, State state, S userMetadata, version, source, - clones + clones, + remoteStoreIndexShallowCopy ); } @@ -512,7 +531,8 @@ public Entry withShardStates(final Map shards) { shards, failure, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); } return withStartedShards(shards); @@ -535,7 +555,8 @@ public Entry withStartedShards(final Map shards) { shards, failure, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); assert updated.state().completed() == false && completed(updated.shards().values()) == false : "Only running snapshots allowed but saw [" + updated + "]"; @@ -567,6 +588,10 @@ public boolean includeGlobalState() { return includeGlobalState; } + public boolean remoteStoreIndexShallowCopy() { + return remoteStoreIndexShallowCopy; + } + public Map userMetadata() { return userMetadata; } @@ -630,7 +655,7 @@ public boolean equals(Object o) { if (version.equals(entry.version) == false) return false; if (Objects.equals(source, ((Entry) o).source) == false) return false; if (clones.equals(((Entry) o).clones) == false) return false; - + if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false; return true; } @@ -647,6 +672,7 @@ public int hashCode() { result = 31 * result + version.hashCode(); result = 31 * result + (source == null ? 0 : source.hashCode()); result = 31 * result + clones.hashCode(); + result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0); return result; } @@ -710,6 +736,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringCollection(dataStreams); out.writeOptionalWriteable(source); out.writeMap(clones, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o)); + if (out.getVersion().onOrAfter(Version.V_2_8_0)) { + out.writeBoolean(remoteStoreIndexShallowCopy); + } } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d6ab3a764abf2..bed7709005494 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1476,6 +1476,12 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + public GatedCloseable acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException { + GatedCloseable indexCommit = acquireLastIndexCommit(flushFirst); + getEngine().refresh("Snapshot for Remote Store based Shard"); + return indexCommit; + } + public Optional getReplicationEngine() { if (getEngine() instanceof NRTReplicationEngine) { return Optional.of((NRTReplicationEngine) getEngine()); diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java new file mode 100644 index 0000000000000..cc32aeaf0c39f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java @@ -0,0 +1,280 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.snapshots.blobstore; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentParserUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Remote Store based Shard snapshot metadata + * + * @opensearch.internal + */ +public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment { + + private final String snapshot; + private final long indexVersion; + private final long startTime; + private final long time; + private final int incrementalFileCount; + private final long incrementalSize; + private final long primaryTerm; + private final long commitGeneration; + private final String remoteStoreRepository; + + private final String indexUUID; + + private static final String NAME = "name"; + private static final String INDEX_VERSION = "index_version"; + private static final String START_TIME = "start_time"; + private static final String TIME = "time"; + + private static final String INDEX_UUID = "index_uuid"; + + private static final String REMOTE_STORE_REPOSITORY = "remote_store_repository"; + + private static final String PRIMARY_TERM = "primary_term"; + + private static final String COMMIT_GENERATION = "commit_generation"; + + private static final String INCREMENTAL_FILE_COUNT = "number_of_files"; + private static final String INCREMENTAL_SIZE = "total_size"; + + private static final ParseField PARSE_NAME = new ParseField(NAME); + + private static final ParseField PARSE_PRIMARY_TERM = new ParseField(PRIMARY_TERM); + private static final ParseField PARSE_COMMIT_GENERATION = new ParseField(COMMIT_GENERATION); + private static final ParseField PARSE_INDEX_VERSION = new ParseField(INDEX_VERSION, "index-version"); + private static final ParseField PARSE_START_TIME = new ParseField(START_TIME); + private static final ParseField PARSE_TIME = new ParseField(TIME); + private static final ParseField PARSE_INCREMENTAL_FILE_COUNT = new ParseField(INCREMENTAL_FILE_COUNT); + private static final ParseField PARSE_INCREMENTAL_SIZE = new ParseField(INCREMENTAL_SIZE); + + private static final ParseField PARSE_INDEX_UUID = new ParseField(INDEX_UUID); + + private static final ParseField PARSE_REMOTE_STORE_REPOSITORY = new ParseField(REMOTE_STORE_REPOSITORY); + + /** + * Serializes shard snapshot metadata info into JSON + * + * @param builder XContent builder + * @param params parameters + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(NAME, snapshot); + builder.field(INDEX_VERSION, indexVersion); + builder.field(START_TIME, startTime); + builder.field(TIME, time); + builder.field(INCREMENTAL_FILE_COUNT, incrementalFileCount); + builder.field(INCREMENTAL_SIZE, incrementalSize); + builder.field(INDEX_UUID, indexUUID); + builder.field(REMOTE_STORE_REPOSITORY, remoteStoreRepository); + builder.field(COMMIT_GENERATION, commitGeneration); + builder.field(PRIMARY_TERM, primaryTerm); + + return builder; + } + + public RemoteStoreShardShallowCopySnapshot( + String snapshot, + long indexVersion, + long primaryTerm, + long commitGeneration, + long startTime, + long time, + int incrementalFileCount, + long incrementalSize, + String indexUUID, + String remoteStoreRepository + ) { + assert snapshot != null; + assert indexVersion >= 0; + assert commitGeneration >= 0 && primaryTerm >= 0; + this.snapshot = snapshot; + this.indexVersion = indexVersion; + this.primaryTerm = primaryTerm; + this.commitGeneration = commitGeneration; + this.startTime = startTime; + this.time = time; + this.incrementalFileCount = incrementalFileCount; + this.incrementalSize = incrementalSize; + this.indexUUID = indexUUID; + this.remoteStoreRepository = remoteStoreRepository; + } + + /** + * Parses shard snapshot metadata + * + * @param parser parser + * @return shard snapshot metadata + */ + public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser parser) throws IOException { + String snapshot = null; + long indexVersion = -1; + long startTime = 0; + long time = 0; + int incrementalFileCount = 0; + long incrementalSize = 0; + String indexUUID = null; + String remoteStoreRepository = null; + long primaryTerm = -1; + long commitGeneration = -1; + + List indexFiles = new ArrayList<>(); + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + XContentParser.Token token = parser.currentToken(); + if (token == XContentParser.Token.START_OBJECT) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser); + final String currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token.isValue()) { + if (PARSE_NAME.match(currentFieldName, parser.getDeprecationHandler())) { + snapshot = parser.text(); + } else if (PARSE_INDEX_VERSION.match(currentFieldName, parser.getDeprecationHandler())) { + indexVersion = parser.longValue(); + } else if (PARSE_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) { + primaryTerm = parser.longValue(); + } else if (PARSE_COMMIT_GENERATION.match(currentFieldName, parser.getDeprecationHandler())) { + commitGeneration = parser.longValue(); + } else if (PARSE_START_TIME.match(currentFieldName, parser.getDeprecationHandler())) { + startTime = parser.longValue(); + } else if (PARSE_TIME.match(currentFieldName, parser.getDeprecationHandler())) { + time = parser.longValue(); + } else if (PARSE_INCREMENTAL_FILE_COUNT.match(currentFieldName, parser.getDeprecationHandler())) { + incrementalFileCount = parser.intValue(); + } else if (PARSE_INCREMENTAL_SIZE.match(currentFieldName, parser.getDeprecationHandler())) { + incrementalSize = parser.longValue(); + } else if (PARSE_INDEX_UUID.match(currentFieldName, parser.getDeprecationHandler())) { + indexUUID = parser.text(); + } else if (PARSE_REMOTE_STORE_REPOSITORY.match(currentFieldName, parser.getDeprecationHandler())) { + remoteStoreRepository = parser.text(); + } else { + throw new OpenSearchParseException("unknown parameter [{}]", currentFieldName); + } + } else { + throw new OpenSearchParseException("unexpected token [{}]", token); + } + } + } + + return new RemoteStoreShardShallowCopySnapshot( + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + incrementalFileCount, + incrementalSize, + indexUUID, + remoteStoreRepository + ); + } + + /** + * Returns shard primary Term during snapshot creation + * + * @return primary Term + */ + public long getPrimaryTerm() { + return primaryTerm; + } + + /** + * Returns snapshot commit generation + * + * @return commit Generation + */ + public long getCommitGeneration() { + return commitGeneration; + } + + /** + * Returns Index UUID + * + * @return index UUID + */ + public String getIndexUUID() { + return indexUUID; + } + + public String getRemoteStoreRepository() { + return remoteStoreRepository; + } + + /** + * Returns snapshot name + * + * @return snapshot name + */ + public String snapshot() { + return snapshot; + } + + /** + * Returns list of files in the shard + * + * @return list of files + */ + + /** + * Returns snapshot start time + */ + public long startTime() { + return startTime; + } + + /** + * Returns snapshot running time + */ + public long time() { + return time; + } + + /** + * Returns incremental of files that were snapshotted + */ + public int incrementalFileCount() { + return incrementalFileCount; + } + + /** + * Returns total number of files that are referenced by this snapshot + */ + public int totalFileCount() { + return 0; + } + + /** + * Returns incremental of files size that were snapshotted + */ + public long incrementalSize() { + return incrementalSize; + } + + /** + * Returns total size of all files that where snapshotted + */ + public long totalSize() { + return 0; + } + +} diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index a6a649fa2cd44..94b9867e9d1e2 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -45,6 +45,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; @@ -182,6 +183,35 @@ public void snapshotShard( ); } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + MapperService mapperService, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, + Map userMetadata, + long primaryTerm, + ActionListener listener + ) { + in.snapshotRemoteStoreIndexShard( + store, + mapperService, + snapshotId, + indexId, + snapshotIndexCommit, + shardStateIdentifier, + snapshotStatus, + repositoryMetaVersion, + userMetadata, + primaryTerm, + listener + ); + } + @Override public void restoreShard( Store store, @@ -194,6 +224,15 @@ public void restoreShard( in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener); } + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + return in.getRemoteStoreShallowCopyShardMetadata(snapshotId, indexId, snapshotShardId); + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { return in.getShardSnapshotStatus(snapshotId, indexId, shardId); diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 1826fe1aa51da..17bdcb10f140b 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -46,6 +46,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.snapshots.SnapshotId; @@ -224,18 +225,18 @@ default RepositoryStats stats() { *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. - * @param store store to be snapshotted - * @param mapperService the shards mapper service - * @param snapshotId snapshot id - * @param indexId id for the index being snapshotted - * @param snapshotIndexCommit commit point - * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used - * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier - * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} - * @param snapshotStatus snapshot status - * @param repositoryMetaVersion version of the updated repository metadata to write - * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} - * @param listener listener invoked on completion + * @param store store to be snapshotted + * @param mapperService the shards mapper service + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} + * @param snapshotStatus snapshot status + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} + * @param listener listener invoked on completion */ void snapshotShard( Store store, @@ -250,6 +251,42 @@ void snapshotShard( ActionListener listener ); + /** + * Adds a reference of remote store data for a index commit point. + *

+ * The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method. + * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. + *

+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check + * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. + * @param store store to be snapshotted + * @param mapperService the shards mapper service + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} + * @param snapshotStatus snapshot status + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} + * @param primaryTerm current Primary Term + * @param listener listener invoked on completion + */ + void snapshotRemoteStoreIndexShard( + Store store, + MapperService mapperService, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + @Nullable String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, + Map userMetadata, + long primaryTerm, + ActionListener listener + ); + /** * Restores snapshot of the shard. *

@@ -270,6 +307,20 @@ void restoreShard( ActionListener listener ); + /** + * Returns Snapshot Shard Metadata for remote store interop enabled snapshot. + *

+ * The index can be renamed on restore, hence different {@code shardId} and {@code snapshotShardId} are supplied. + * @param snapshotId snapshot id + * @param indexId id of the index in the repository from which the restore is occurring + * @param snapshotShardId shard id (in the snapshot) + */ + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ); + /** * Retrieve shard snapshot status for the stored snapshot * diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index ab6797b745044..f94f055b25beb 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -107,6 +107,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; import org.opensearch.index.snapshots.blobstore.RateLimitingInputStream; import org.opensearch.index.snapshots.blobstore.SlicedInputStream; @@ -240,6 +241,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + public static final Setting REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting( + "remote_store_index_shallow_copy", + false, + Setting.Property.Dynamic + ); + /** * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation @@ -313,6 +320,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp BlobStoreIndexShardSnapshot::fromXContent ); + public static final ChecksumBlobStoreFormat REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT = + new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, RemoteStoreShardShallowCopySnapshot::fromXContent); + public static final ChecksumBlobStoreFormat INDEX_SHARD_SNAPSHOTS_FORMAT = new ChecksumBlobStoreFormat<>( "snapshots", SNAPSHOT_INDEX_NAME_FORMAT, @@ -2290,6 +2300,79 @@ private void writeAtomic(BlobContainer container, final String blobName, final B } } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + MapperService mapperService, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, + Map userMetadata, + long primaryTerm, + ActionListener listener + ) { + if (isReadOnly()) { + listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); + return; + } + final ShardId shardId = store.shardId(); + final long startTime = threadPool.absoluteTimeInMillis(); + try { + final String generation = snapshotStatus.generation(); + logger.info("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation); + final BlobContainer shardContainer = shardContainer(indexId, shardId); + + long indexTotalFileSize = 0; + Collection fileNames = snapshotIndexCommit.getFileNames(); + for (String fileName : fileNames) { + indexTotalFileSize += store.getMetadata(snapshotIndexCommit).get(fileName).length(); + } + int indexTotalNumberOfFiles = fileNames.size(); + + snapshotStatus.moveToStarted( + startTime, + 0, // incremental File Count is zero as we are storing the data as part of remote store. + indexTotalNumberOfFiles, + 0, // incremental File Size is zero as we are storing the data as part of remote store. + indexTotalFileSize + ); + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + + // now create and write the commit point + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + new RemoteStoreShardShallowCopySnapshot( + snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + primaryTerm, + snapshotIndexCommit.getGeneration(), + lastSnapshotStatus.getStartTime(), + threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), + lastSnapshotStatus.getIncrementalFileCount(), + lastSnapshotStatus.getIncrementalSize(), + store.indexSettings().getUUID(), + store.indexSettings().getRemoteStoreRepository() + ), + shardContainer, + snapshotId.getUUID(), + compress + ); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), generation); + listener.onResponse(generation); + + } catch (Exception e) { + listener.onFailure(e); + } + } + @Override public void snapshotShard( Store store, @@ -2541,6 +2624,16 @@ private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.F return true; } + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + final BlobContainer container = shardContainer(indexId, snapshotShardId); + return loadRemStoreEnabledShardSnapshot(container, snapshotId); + } + @Override public void restoreShard( Store store, @@ -2870,6 +2963,24 @@ private static List unusedBlobs( .collect(Collectors.toList()); } + /** + * Loads information about remote store enabled shard snapshot for remote store interop enabled snapshots + */ + public RemoteStoreShardShallowCopySnapshot loadRemStoreEnabledShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { + try { + return REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + throw new SnapshotMissingException(metadata.name(), snapshotId, ex); + } catch (IOException ex) { + throw new SnapshotException( + metadata.name(), + snapshotId, + "failed to read shard snapshot file for [" + shardContainer.path() + ']', + ex + ); + } + } + /** * Loads information about shard snapshot */ diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index b1274f94310fb..366bf8b57a055 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -94,6 +94,8 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String TOTAL_SHARDS = "total_shards"; private static final String SUCCESSFUL_SHARDS = "successful_shards"; private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; + + private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy"; private static final String USER_METADATA = "metadata"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime) @@ -115,6 +117,8 @@ public static final class SnapshotInfoBuilder { private long endTime = 0L; private ShardStatsBuilder shardStatsBuilder = null; private Boolean includeGlobalState = null; + + private Boolean remoteStoreIndexShallowCopy = false; private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -167,6 +171,10 @@ private void setVersion(int version) { this.version = version; } + private void setRemoteStoreIndexShallowCopy(Boolean remoteStoreIndexShallowCopy) { + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; + } + private void setShardFailures(List shardFailures) { this.shardFailures = shardFailures; } @@ -205,7 +213,8 @@ public SnapshotInfo build() { successfulShards, shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } } @@ -256,6 +265,11 @@ int getSuccessfulShards() { SNAPSHOT_INFO_PARSER.declareBoolean(SnapshotInfoBuilder::setIncludeGlobalState, new ParseField(INCLUDE_GLOBAL_STATE)); SNAPSHOT_INFO_PARSER.declareObject(SnapshotInfoBuilder::setUserMetadata, (p, c) -> p.map(), new ParseField(USER_METADATA)); SNAPSHOT_INFO_PARSER.declareInt(SnapshotInfoBuilder::setVersion, new ParseField(VERSION_ID)); + // TODO: adding it here means it will showup in create snapshot response, confirm if we should do that... + SNAPSHOT_INFO_PARSER.declareBoolean( + SnapshotInfoBuilder::setRemoteStoreIndexShallowCopy, + new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY) + ); SNAPSHOT_INFO_PARSER.declareObjectArray( SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, @@ -289,6 +303,9 @@ int getSuccessfulShards() { @Nullable private Boolean includeGlobalState; + @Nullable + private Boolean remoteStoreIndexShallowCopy; + @Nullable private final Map userMetadata; @@ -298,11 +315,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -319,7 +336,8 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) { 0, Collections.emptyList(), entry.includeGlobalState(), - entry.userMetadata() + entry.userMetadata(), + entry.remoteStoreIndexShallowCopy() ); } @@ -333,7 +351,8 @@ public SnapshotInfo( int totalShards, List shardFailures, Boolean includeGlobalState, - Map userMetadata + Map userMetadata, + Boolean remoteStoreIndexShallowCopy ) { this( snapshotId, @@ -348,7 +367,8 @@ public SnapshotInfo( totalShards - shardFailures.size(), shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } @@ -365,7 +385,8 @@ public SnapshotInfo( int successfulShards, List shardFailures, Boolean includeGlobalState, - Map userMetadata + Map userMetadata, + Boolean remoteStoreIndexShallowCopy ) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); @@ -380,6 +401,7 @@ public SnapshotInfo( this.shardFailures = Objects.requireNonNull(shardFailures); this.includeGlobalState = includeGlobalState; this.userMetadata = userMetadata; + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; } /** @@ -399,6 +421,9 @@ public SnapshotInfo(final StreamInput in) throws IOException { includeGlobalState = in.readOptionalBoolean(); userMetadata = in.readMap(); dataStreams = in.readStringList(); + if (in.getVersion().onOrAfter(Version.V_2_8_0)) { + remoteStoreIndexShallowCopy = in.readOptionalBoolean(); + } } /** @@ -508,6 +533,10 @@ public Boolean includeGlobalState() { return includeGlobalState; } + public Boolean isRemoteStoreIndexShallowCopyEnabled() { + return remoteStoreIndexShallowCopy; + } + /** * Returns shard failures; an empty list will be returned if there were no shard * failures, or if {@link #state()} returns {@code null}. @@ -573,6 +602,8 @@ public String toString() { + version + ", shardFailures=" + shardFailures + + ", isRemoteStoreInteropEnabled=" + + remoteStoreIndexShallowCopy + '}'; } @@ -643,6 +674,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa } builder.endArray(); } + if (remoteStoreIndexShallowCopy != null) { + builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); + } if (verbose || totalShards != 0) { builder.startObject(SHARDS); builder.field(TOTAL, totalShards); @@ -687,6 +721,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final shardFailure.toXContent(builder, params); } builder.endArray(); + if (remoteStoreIndexShallowCopy != null) { + builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); + } builder.endObject(); return builder; } @@ -709,6 +746,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr int totalShards = 0; int successfulShards = 0; Boolean includeGlobalState = null; + Boolean remoteStoreIndexShallowCopy = null; Map userMetadata = null; List shardFailures = Collections.emptyList(); if (parser.currentToken() == null) { // fresh parser? move to the first token @@ -746,6 +784,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr version = Version.fromId(parser.intValue()); } else if (INCLUDE_GLOBAL_STATE.equals(currentFieldName)) { includeGlobalState = parser.booleanValue(); + } else if (REMOTE_STORE_INDEX_SHALLOW_COPY.equals(currentFieldName)) { + remoteStoreIndexShallowCopy = parser.booleanValue(); } } else if (token == XContentParser.Token.START_ARRAY) { if (DATA_STREAMS.equals(currentFieldName)) { @@ -797,7 +837,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr successfulShards, shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } @@ -826,6 +867,9 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeOptionalBoolean(includeGlobalState); out.writeMap(userMetadata); out.writeStringCollection(dataStreams); + if (out.getVersion().onOrAfter(Version.V_2_8_0)) { + out.writeOptionalBoolean(remoteStoreIndexShallowCopy); + } } private static SnapshotState snapshotState(final String reason, final List shardFailures) { @@ -857,7 +901,8 @@ public boolean equals(Object o) { && Objects.equals(includeGlobalState, that.includeGlobalState) && Objects.equals(version, that.version) && Objects.equals(shardFailures, that.shardFailures) - && Objects.equals(userMetadata, that.userMetadata); + && Objects.equals(userMetadata, that.userMetadata) + && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy); } @Override @@ -876,7 +921,8 @@ public int hashCode() { includeGlobalState, version, shardFailures, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 48589736166e8..9baf3f69ab7d0 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -36,6 +36,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterChangedEvent; @@ -62,6 +64,8 @@ import org.opensearch.index.snapshots.IndexShardSnapshotFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.IndexShardSnapshotStatus.Stage; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; @@ -279,38 +283,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { - @Override - public void onResponse(String newGeneration) { - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug( - "snapshot [{}] completed to [{}] with [{}] at generation [{}]", - snapshot, - snapshot.getRepository(), - lastSnapshotStatus, - snapshotStatus.generation() - ); + snapshot( + shardId, + snapshot, + indexId, + entry.userMetadata(), + entry.remoteStoreIndexShallowCopy(), + snapshotStatus, + entry.version(), + new ActionListener<>() { + @Override + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug( + "snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, + snapshot.getRepository(), + lastSnapshotStatus, + snapshotStatus.generation() + ); + } + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); } - notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); - } - @Override - public void onFailure(Exception e) { - final String failure; - if (e instanceof AbortedSnapshotException) { - failure = "aborted"; - logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); - } else { - failure = summarizeFailure(e); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + @Override + public void onFailure(Exception e) { + final String failure; + if (e instanceof AbortedSnapshotException) { + failure = "aborted"; + logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); + } else { + failure = summarizeFailure(e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + } + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + notifyFailedSnapshotShard(snapshot, shardId, failure); } - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure); } - }); + ); } } }); @@ -360,6 +373,7 @@ private void snapshot( final Snapshot snapshot, final IndexId indexId, final Map userMetadata, + final boolean remoteStoreIndexShallowCopy, final IndexShardSnapshotStatus snapshotStatus, Version version, ActionListener listener @@ -384,8 +398,35 @@ private void snapshot( GatedCloseable wrappedSnapshot = null; try { // we flush first to make sure we get the latest writes snapshotted - wrappedSnapshot = indexShard.acquireLastIndexCommit(true); + if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) { + wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); + } else { + wrappedSnapshot = indexShard.acquireLastIndexCommit(true); + } + final IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); + if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) { + acquireLockOnCommitData( + indexShard.remoteStore(), + snapshot.getSnapshotId().getUUID(), + indexShard.getOperationPrimaryTerm(), + snapshotIndexCommit.getGeneration() + ); + repository.snapshotRemoteStoreIndexShard( + indexShard.store(), + indexShard.mapperService(), + snapshot.getSnapshotId(), + indexId, + wrappedSnapshot.get(), + getShardStateId(indexShard, snapshotIndexCommit), + snapshotStatus, + version, + userMetadata, + indexShard.getOperationPrimaryTerm(), + ActionListener.runBefore(listener, wrappedSnapshot::close) + ); + return; + } repository.snapshotShard( indexShard.store(), indexShard.mapperService(), @@ -407,6 +448,18 @@ private void snapshot( } } + private void acquireLockOnCommitData(Store remoteStore, String snapshotId, long primaryTerm, long generation) throws IOException { + assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of " + "FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory + : "Store.directory is not enclosing an instance of FilterDirectory"; + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of " + + "RemoteSegmentStoreDirectory"; + ((RemoteSegmentStoreDirectory) remoteDirectory).acquireLock(primaryTerm, generation, snapshotId); + } + /** * Generates an identifier from the current state of a shard that can be used to detect whether a shard's contents * have changed between two snapshots. diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 8a5cc0bf9517a..b68a7e81dbd63 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -129,6 +129,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableList; import static org.opensearch.cluster.SnapshotsInProgress.completed; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; import static org.opensearch.snapshots.SnapshotUtils.validateSnapshotsBackingAnyIndex; /** @@ -261,6 +262,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList // retries final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot Repository repository = repositoriesService.repository(request.repository()); + if (repository.isReadOnly()) { listener.onFailure(new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository")); return; @@ -334,6 +336,13 @@ public ClusterState execute(ClusterState currentState) { ); } } + + Boolean remoteStoreIndexShallowCopy = request.remoteStoreIndexShallowCopy(); + + if (remoteStoreIndexShallowCopy == null) { + remoteStoreIndexShallowCopy = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings()); + } + newEntry = SnapshotsInProgress.startedEntry( new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), @@ -344,7 +353,8 @@ public ClusterState execute(ClusterState currentState) { repositoryData.getGenId(), shards, userMeta, - version + version, + remoteStoreIndexShallowCopy ); final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); @@ -1396,7 +1406,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures, entry.includeGlobalState(), - entry.userMetadata() + entry.userMetadata(), + entry.remoteStoreIndexShallowCopy() ); final StepListener metadataListener = new StepListener<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 64d199a51493a..7a294094a21d8 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -94,7 +94,8 @@ protected CreateSnapshotResponse createTestInstance() { totalShards, shardFailures, globalState, - SnapshotInfoTests.randomUserMetadata() + SnapshotInfoTests.randomUserMetadata(), + false ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index 1116e156b28cc..02b8f47f0e1de 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -76,7 +76,8 @@ protected GetSnapshotsResponse createTestInstance() { randomIntBetween(2, 3), shardFailures, randomBoolean(), - SnapshotInfoTests.randomUserMetadata() + SnapshotInfoTests.randomUserMetadata(), + false ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index 6cf490843affa..008046aa83dfe 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -171,7 +171,7 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo Map.of(), null, null, - null + false ); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index c309b90f1a777..b3772edcbe27c 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -111,7 +111,8 @@ public void testDeleteSnapshotting() { Map.of(), null, SnapshotInfoTests.randomUserMetadata(), - VersionUtils.randomVersion(random()) + VersionUtils.randomVersion(random()), + false ) ) ); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java index 0e2f06ba59d89..5cffa1931dd39 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java @@ -439,7 +439,8 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh shardsBuilder, null, SnapshotInfoTests.randomUserMetadata(), - VersionUtils.randomVersion(random()) + VersionUtils.randomVersion(random()), + false ); return ClusterState.builder(newState) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry))) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index eb7ff360ec5d9..f709318b783b8 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -127,6 +127,7 @@ import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreStats; import org.opensearch.index.store.StoreUtils; @@ -2777,6 +2778,15 @@ public void restoreShard( return null; }); } + + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + return null; + } }, future); assertTrue(future.actionGet()); assertThat(target.getLocalCheckpoint(), equalTo(2L)); diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 03636c97d1319..a5dc69ae06ae3 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardId; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.store.Store; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; @@ -315,6 +316,23 @@ public void snapshotShard( } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + MapperService mapperService, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, + Map userMetadata, + long primaryTerm, + ActionListener listener + ) { + + } + @Override public void restoreShard( Store store, @@ -327,6 +345,15 @@ public void restoreShard( } + @Override + public RemoteStoreShardShallowCopySnapshot getRemoteStoreShallowCopyShardMetadata( + SnapshotId snapshotId, + IndexId indexId, + ShardId snapshotShardId + ) { + return null; + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { return null; diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 63b144dae9c93..c3bd4dcaf530d 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -212,7 +212,8 @@ public void testSnapshotWithConflictingName() throws Exception { 6, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java index 8320d73d396ea..c7329de8e505e 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java @@ -83,7 +83,8 @@ protected SnapshotInfo createTestInstance() { totalShards, shardFailures, includeGlobalState, - userMetadata + userMetadata, + false ); } @@ -110,7 +111,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 1: int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10)); @@ -127,7 +129,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 2: return new SnapshotInfo( @@ -140,7 +143,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 3: return new SnapshotInfo( @@ -153,7 +157,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 4: return new SnapshotInfo( @@ -166,7 +171,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 5: int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100)); @@ -191,7 +197,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { totalShards, shardFailures, instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 6: return new SnapshotInfo( @@ -204,7 +211,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), Boolean.FALSE.equals(instance.includeGlobalState()), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 7: return new SnapshotInfo( @@ -217,7 +225,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata) + randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 8: List dataStreams = randomValueOtherThan( @@ -234,7 +243,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); default: throw new IllegalArgumentException("invalid randomization case"); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java index d02b82b45d90e..b2be4f63491ef 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -115,7 +115,8 @@ private Entry randomSnapshot() { shards, null, SnapshotInfoTests.randomUserMetadata(), - VersionUtils.randomVersion(random()) + VersionUtils.randomVersion(random()), + false ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java index bbdb71c2024cf..9b12212c791a2 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java @@ -484,7 +484,8 @@ private static SnapshotsInProgress.Entry snapshotEntry( randomNonNegativeLong(), shards, Collections.emptyMap(), - Version.CURRENT + Version.CURRENT, + false ); } diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 0dc8d435ec783..43dde7281fb2d 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -232,7 +232,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { 5, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), @@ -257,7 +258,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { 6, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), @@ -284,7 +286,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { 5, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 2a85fffa8699a..2e05de9eeda60 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -175,6 +175,21 @@ public void snapshotShard( ActionListener listener ) {} + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + MapperService mapperService, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, + Map userMetadata, + long primaryTerm, + ActionListener listener + ) {} + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { return null; diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index f5497fb60d44d..9933297aa1c96 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -523,7 +523,8 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget( f -> repo.finalizeSnapshot(