Skip to content

Commit

Permalink
Introduce experimental searchable snapshot API (opensearch-project#4680)
Browse files Browse the repository at this point in the history
This commit adds a new parameter to the snapshot restore API to restore
to a new type of "remote snapshot" index where, unlike traditional
snapshot restores, the index data is not all downloaded to disk and
instead is read on-demand at search time. The feature is functional with
this commit, and includes a simple end-to-end integration test, but is
far from complete. See tracking issue opensearch-project#2919 for the rest of the work
planned/underway.

All new capabilities are gated behind a new searchable snapshot feature
flag.

Signed-off-by: Andrew Ross <[email protected]>

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored Oct 13, 2022
1 parent d15795a commit 431bdeb
Show file tree
Hide file tree
Showing 23 changed files with 705 additions and 67 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Recommission API changes for service layer ([#4320](https://github.com/opensearch-project/OpenSearch/pull/4320))
- Update GeoGrid base class access modifier to support extensibility ([#4572](https://github.com/opensearch-project/OpenSearch/pull/4572))
- Add a new node role 'search' which is dedicated to provide search capability ([#4689](https://github.com/opensearch-project/OpenSearch/pull/4689))
- Introduce experimental searchable snapshot API ([#4680](https://github.com/opensearch-project/OpenSearch/pull/4680))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.snapshots;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import org.hamcrest.MatcherAssert;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.monitor.fs.FsInfo;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.CollectionUtils.iterableAsArrayList;

public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase {

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue(
"Searchable snapshot feature flag is enabled",
Boolean.parseBoolean(System.getProperty(FeatureFlags.SEARCHABLE_SNAPSHOT))
);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCreateSearchableSnapshot() throws Exception {
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
);
createIndex(
"test-idx-2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
indexRandomDocs("test-idx-2", 100);

logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx-1", "test-idx-2")
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
* This assertion is digging a bit into the implementation details to
* verify that the Lucene segment files are not copied from the snapshot
* repository to the node's local disk for a remote snapshot index.
*/
private void assertIndexDirectoryDoesNotExist(String... indexNames) {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
for (String indexName : indexNames) {
final Index index = state.metadata().index(indexName).getIndex();
// Get the primary shards for the given index
final GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable()
.activePrimaryShardsGrouped(new String[] { indexName }, false);
// Randomly pick one of the shards
final List<ShardIterator> iterators = iterableAsArrayList(shardIterators);
final ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators);
final ShardRouting shardRouting = shardIterator.nextOrNull();
assertNotNull(shardRouting);
assertTrue(shardRouting.primary());
assertTrue(shardRouting.assignedToNode());
// Get the file system stats for the assigned node
final String nodeId = shardRouting.currentNodeId();
final NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats(nodeId).addMetric(FS.metricName()).get();
for (FsInfo.Path info : nodeStats.getNodes().get(0).getFs()) {
// Build the expected path for the index data for a "normal"
// index and assert it does not exist
final String path = info.getPath();
final Path file = PathUtils.get(path)
.resolve("indices")
.resolve(index.getUUID())
.resolve(Integer.toString(shardRouting.getId()))
.resolve("index");
MatcherAssert.assertThat("Expect file not to exist: " + file, Files.exists(file), is(false));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.snapshots.restore;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
Expand All @@ -68,6 +70,38 @@ public class RestoreSnapshotRequest extends ClusterManagerNodeRequest<RestoreSna

private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestoreSnapshotRequest.class);

/**
* Enumeration of possible storage types
*/
public enum StorageType {
LOCAL("local"),
REMOTE_SNAPSHOT("remote_snapshot");

private final String text;

StorageType(String text) {
this.text = text;
}

@Override
public String toString() {
return text;
}

private void toXContent(XContentBuilder builder) throws IOException {
builder.field("storage_type", text);
}

private static StorageType fromString(String string) {
for (StorageType type : values()) {
if (type.text.equals(string)) {
return type;
}
}
throw new IllegalArgumentException("Invalid storage_type: " + string);
}
}

private String snapshot;
private String repository;
private String[] indices = Strings.EMPTY_ARRAY;
Expand All @@ -80,6 +114,7 @@ public class RestoreSnapshotRequest extends ClusterManagerNodeRequest<RestoreSna
private boolean includeAliases = true;
private Settings indexSettings = EMPTY_SETTINGS;
private String[] ignoreIndexSettings = Strings.EMPTY_ARRAY;
private StorageType storageType = StorageType.LOCAL;

@Nullable // if any snapshot UUID will do
private String snapshotUuid;
Expand Down Expand Up @@ -117,6 +152,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) {
snapshotUuid = in.readOptionalString();
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_3_0_0)) {
storageType = in.readEnum(StorageType.class);
}
}

@Override
Expand Down Expand Up @@ -144,6 +182,9 @@ public void writeTo(StreamOutput out) throws IOException {
"restricting the snapshot UUID is forbidden in a cluster with version [" + out.getVersion() + "] nodes"
);
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeEnum(storageType);
}
}

@Override
Expand Down Expand Up @@ -480,6 +521,22 @@ public String snapshotUuid() {
return snapshotUuid;
}

/**
* Sets the storage type for this request.
*/
RestoreSnapshotRequest storageType(StorageType storageType) {
this.storageType = storageType;
return this;
}

/**
* Gets the storage type for this request. {@link StorageType#LOCAL} is the
* implicit default if not overridden.
*/
public StorageType storageType() {
return storageType;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -537,6 +594,18 @@ public RestoreSnapshotRequest source(Map<String, Object> source) {
} else {
throw new IllegalArgumentException("malformed ignore_index_settings section, should be an array of strings");
}
} else if (name.equals("storage_type")) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
if (entry.getValue() instanceof String) {
storageType(StorageType.fromString((String) entry.getValue()));
} else {
throw new IllegalArgumentException("malformed storage_type");
}
} else {
throw new IllegalArgumentException(
"Unsupported parameter " + name + ". Feature flag is not enabled for this experimental feature"
);
}
} else {
if (IndicesOptions.isIndicesOptions(name) == false) {
throw new IllegalArgumentException("Unknown parameter " + name);
Expand Down Expand Up @@ -579,6 +648,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(ignoreIndexSetting);
}
builder.endArray();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && storageType != null) {
storageType.toXContent(builder);
}
builder.endObject();
return builder;
}
Expand All @@ -605,7 +677,8 @@ public boolean equals(Object o) {
&& Objects.equals(renameReplacement, that.renameReplacement)
&& Objects.equals(indexSettings, that.indexSettings)
&& Arrays.equals(ignoreIndexSettings, that.ignoreIndexSettings)
&& Objects.equals(snapshotUuid, that.snapshotUuid);
&& Objects.equals(snapshotUuid, that.snapshotUuid)
&& Objects.equals(storageType, that.storageType);
}

@Override
Expand All @@ -621,7 +694,8 @@ public int hashCode() {
partial,
includeAliases,
indexSettings,
snapshotUuid
snapshotUuid,
storageType
);
result = 31 * result + Arrays.hashCode(indices);
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,12 @@ public RestoreSnapshotRequestBuilder setIgnoreIndexSettings(List<String> ignoreI
request.ignoreIndexSettings(ignoreIndexSettings);
return this;
}

/**
* Sets the storage type
*/
public RestoreSnapshotRequestBuilder setStorageType(RestoreSnapshotRequest.StorageType storageType) {
request.storageType(storageType);
return this;
}
}
Loading

0 comments on commit 431bdeb

Please sign in to comment.