Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement prototype remote store directory/index input for search #7417

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.indices.close.CloseIndexRequest;
import org.opensearch.action.admin.indices.open.OpenIndexRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class RemoteSearchIT extends AbstractSnapshotIntegTestCase {

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put("index.refresh_interval", "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

public void testRemoteSearchIndex() throws Exception {
final String indexName = "test-idx-1";
final int numReplicasIndex = randomIntBetween(0, 3);
final int numOfDocs = 100;

// Spin up node having search/data roles
internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);

// Create index with remote translog index settings
createIndex(indexName, Settings.builder()
.put(remoteTranslogIndexSettings(numReplicasIndex))
.build());
ensureGreen();

// Index some documents
indexRandomDocs(indexName, numOfDocs);
ensureGreen();
// Search the documents on the index
assertDocCount(indexName, 100L);

// Close the index
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(indexName);
client().admin().indices().close(closeIndexRequest).actionGet();

// Apply the remote search setting to the index
client().admin().indices().updateSettings(new UpdateSettingsRequest(Settings.builder()
.put(INDEX_STORE_TYPE_SETTING.getKey(), "remote_search")
.build()
)).actionGet();

// Open the index back
OpenIndexRequest openIndexRequest = new OpenIndexRequest(indexName);
client().admin().indices().open(openIndexRequest).actionGet();

// Perform search on the index again
assertDocCount(indexName, 100L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
if (IndexModule.Type.REMOTE_SEARCH.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())) ||
IndexModule.Type.REMOTE_SEARCH.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSearchDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -407,7 +408,8 @@ public enum Type {
MMAPFS("mmapfs"),
SIMPLEFS("simplefs"),
FS("fs"),
REMOTE_SNAPSHOT("remote_snapshot");
REMOTE_SNAPSHOT("remote_snapshot"),
REMOTE_SEARCH("remote_search");

private final String settingsKey;
private final boolean deprecated;
Expand Down Expand Up @@ -683,6 +685,12 @@ public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirect
new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache)
);
break;
case REMOTE_SEARCH:
factories.put(
type.getSettingsKey(),
new RemoteSearchDirectoryFactory(repositoriesService, remoteStoreFileCache)
);
break;
default:
throw new IllegalStateException("No directory factory mapping for built-in type " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ public final class IndexSettings {
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private final boolean isRemoteIndex;
private Version extendedCompatibilitySnapshotVersion;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -768,6 +769,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
isRemoteIndex = IndexModule.Type.REMOTE_SEARCH.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
Expand Down Expand Up @@ -1055,6 +1057,10 @@ public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}

public boolean isRemoteIndex() {
return isRemoteIndex;
}

/**
* If this is a remote snapshot and the extended compatibility
* feature flag is enabled, this returns the minimum {@link Version}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};

// Do not load the global checkpoint if this is a remote snapshot index
if (indexSettings.isRemoteSnapshot() == false) {
if (indexSettings.isRemoteSnapshot() == false && indexSettings.isRemoteIndex() == false) {
loadGlobalCheckpointToReplicationTracker();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
}

public String getOriginalFilename() {
return originalFilename;
}

public String getUploadedFilename() {
return uploadedFilename;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.directory;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.file.OnDemandBlockSearchIndexInput;
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
* a Directory implementation that can read directly from index segments metadata stored in the remote store.
*
* @opensearch.internal
*/
public final class RemoteSearchDirectory extends Directory {
private final FSDirectory localStoreDir;
private final TransferManager transferManager;

private final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap;

public RemoteSearchDirectory(Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap,
FSDirectory localStoreDir, TransferManager transferManager) {
this.localStoreDir = localStoreDir;
this.transferManager = transferManager;
this.uploadedSegmentMetadataMap = uploadedSegmentMetadataMap;
}

@Override
public String[] listAll() throws IOException {
return uploadedSegmentMetadataMap.keySet().toArray(new String[0]);
}

@Override
public void deleteFile(String name) throws IOException {}

@Override
public long fileLength(String name) throws IOException {
return uploadedSegmentMetadataMap.get(name).getLength();
}

@Override
public IndexOutput createOutput(String name, IOContext context) {
return NoopIndexOutput.INSTANCE;
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
final RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = uploadedSegmentMetadataMap.get(name);
return new OnDemandBlockSearchIndexInput(uploadedSegmentMetadata, localStoreDir, transferManager);
}

@Override
public void close() throws IOException {
localStoreDir.close();
}

@Override
public Set<String> getPendingDeletions() throws IOException {
return Collections.emptySet();
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
throw new UnsupportedOperationException();
}

@Override
public void sync(Collection<String> names) throws IOException {}

@Override
public void syncMetaData() {}

@Override
public void rename(String source, String dest) throws IOException {}

@Override
public Lock obtainLock(String name) throws IOException {
return NoLockFactory.INSTANCE.obtainLock(null, null);
}

static class NoopIndexOutput extends IndexOutput {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pull the common functionalities including this class out to a new class RemoteDirectory which can be extended by RemoteSearchDirectory and RemoteSnapshotDirectory.

public abstract class RemoteDirectory extends Directory {
    ...
    <All empty and Noop based methods>
    ...
    ...
    static class NoopIndexOutput extends IndexOutput {
   }
}
public final class RemoteSnapshotDirectory extends RemoteDirectory {}
public final class RemoteSearchDirectory extends RemoteDirectory {}

That should prevent a bunch of NoOp duplication across classes, and can be overridden by specific implementations in future phases, if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteDirectory is another class present ref, but I see your point we can take out the common part to another class.


final static NoopIndexOutput INSTANCE = new NoopIndexOutput();

NoopIndexOutput() {
super("noop", "noop");
}

@Override
public void close() throws IOException {

}

@Override
public long getFilePointer() {
return 0;
}

@Override
public long getChecksum() throws IOException {
return 0;
}

@Override
public void writeByte(byte b) throws IOException {

}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {

}
}
}
Loading