diff --git a/server/src/internalClusterTest/java/org/opensearch/search/RemoteSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/RemoteSearchIT.java new file mode 100644 index 0000000000000..ee74c70b1ebd4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/RemoteSearchIT.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.admin.indices.close.CloseIndexRequest; +import org.opensearch.action.admin.indices.open.OpenIndexRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class RemoteSearchIT extends AbstractSnapshotIntegTestCase { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + private Settings remoteStoreIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(super.indexSettings()) + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + private Settings remoteTranslogIndexSettings(int numberOfReplicas) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + public void testRemoteSearchIndex() throws Exception { + final String indexName = "test-idx-1"; + final int numReplicasIndex = randomIntBetween(0, 3); + final int numOfDocs = 100; + + // Spin up node having search/data roles + internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1); + + // Create index with remote translog index settings + createIndex(indexName, Settings.builder() + .put(remoteTranslogIndexSettings(numReplicasIndex)) + .build()); + ensureGreen(); + + // Index some documents + indexRandomDocs(indexName, numOfDocs); + ensureGreen(); + // Search the documents on the index + assertDocCount(indexName, 100L); + + // Close the index + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(indexName); + client().admin().indices().close(closeIndexRequest).actionGet(); + + // Apply the remote search setting to the index + client().admin().indices().updateSettings(new UpdateSettingsRequest(Settings.builder() + .put(INDEX_STORE_TYPE_SETTING.getKey(), "remote_search") + .build() + )).actionGet(); + + // Open the index back + OpenIndexRequest openIndexRequest = new OpenIndexRequest(indexName); + client().admin().indices().open(openIndexRequest).actionGet(); + + // Perform search on the index again + assertDocCount(indexName, 100L); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index cf77349271eb5..dfc613dbbd0c3 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -399,6 +399,9 @@ public Builder addBlocks(IndexMetadata indexMetadata) { if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE); } + if (IndexModule.Type.REMOTE_SEARCH.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { + addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE); + } return this; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index a4ff237460e28..0e62ca3e61b1e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -61,7 +61,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { Settings indexSettings = indexMetadata.getSettings(); - if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { + if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())) || + IndexModule.Type.REMOTE_SEARCH.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { return REMOTE_CAPABLE; } return LOCAL_ONLY; diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index bdb043b7b9aa1..e1b57730ceb99 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -71,6 +71,7 @@ import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.remote.directory.RemoteSearchDirectoryFactory; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.translog.TranslogFactory; @@ -407,7 +408,8 @@ public enum Type { MMAPFS("mmapfs"), SIMPLEFS("simplefs"), FS("fs"), - REMOTE_SNAPSHOT("remote_snapshot"); + REMOTE_SNAPSHOT("remote_snapshot"), + REMOTE_SEARCH("remote_search"); private final String settingsKey; private final boolean deprecated; @@ -683,6 +685,12 @@ public static Map createBuiltInDirect new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache) ); break; + case REMOTE_SEARCH: + factories.put( + type.getSettingsKey(), + new RemoteSearchDirectoryFactory(repositoriesService, remoteStoreFileCache) + ); + break; default: throw new IllegalStateException("No directory factory mapping for built-in type " + type); } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index e125facb76059..3ae8166ea54bb 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -601,6 +601,7 @@ public final class IndexSettings { private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; + private final boolean isRemoteIndex; private Version extendedCompatibilitySnapshotVersion; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock @@ -773,6 +774,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti ); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); + isRemoteIndex = IndexModule.Type.REMOTE_SEARCH.match(this.settings); if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION; @@ -1066,6 +1068,10 @@ public boolean isRemoteSnapshot() { return isRemoteSnapshot; } + public boolean isRemoteIndex() { + return isRemoteIndex; + } + /** * If this is a remote snapshot and the extended compatibility * feature flag is enabled, this returns the minimum {@link Version} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2ec0d186bf056..389d8761e6d62 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2185,7 +2185,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; // Do not load the global checkpoint if this is a remote snapshot index - if (indexSettings.isRemoteSnapshot() == false) { + if (indexSettings.isRemoteSnapshot() == false && indexSettings.isRemoteIndex() == false) { loadGlobalCheckpointToReplicationTracker(); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9f41ac6f7fd17..ab111f3edafe8 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -189,6 +189,14 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); } + + public String getOriginalFilename() { + return originalFilename; + } + + public String getUploadedFilename() { + return uploadedFilename; + } } /** diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSearchDirectory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSearchDirectory.java new file mode 100644 index 0000000000000..57983e1d76d3e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSearchDirectory.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.directory; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NoLockFactory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.remote.file.OnDemandBlockSearchIndexInput; +import org.opensearch.index.store.remote.utils.TransferManager; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * a Directory implementation that can read directly from index segments metadata stored in the remote store. + * + * @opensearch.internal + */ +public final class RemoteSearchDirectory extends Directory { + private final FSDirectory localStoreDir; + private final TransferManager transferManager; + + private final Map uploadedSegmentMetadataMap; + + public RemoteSearchDirectory(Map uploadedSegmentMetadataMap, + FSDirectory localStoreDir, TransferManager transferManager) { + this.localStoreDir = localStoreDir; + this.transferManager = transferManager; + this.uploadedSegmentMetadataMap = uploadedSegmentMetadataMap; + } + + @Override + public String[] listAll() throws IOException { + return uploadedSegmentMetadataMap.keySet().toArray(new String[0]); + } + + @Override + public void deleteFile(String name) throws IOException {} + + @Override + public long fileLength(String name) throws IOException { + return uploadedSegmentMetadataMap.get(name).getLength(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) { + return NoopIndexOutput.INSTANCE; + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + final RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = uploadedSegmentMetadataMap.get(name); + return new OnDemandBlockSearchIndexInput(uploadedSegmentMetadata, localStoreDir, transferManager); + } + + @Override + public void close() throws IOException { + localStoreDir.close(); + } + + @Override + public Set getPendingDeletions() throws IOException { + return Collections.emptySet(); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public void sync(Collection names) throws IOException {} + + @Override + public void syncMetaData() {} + + @Override + public void rename(String source, String dest) throws IOException {} + + @Override + public Lock obtainLock(String name) throws IOException { + return NoLockFactory.INSTANCE.obtainLock(null, null); + } + + static class NoopIndexOutput extends IndexOutput { + + final static NoopIndexOutput INSTANCE = new NoopIndexOutput(); + + NoopIndexOutput() { + super("noop", "noop"); + } + + @Override + public void close() throws IOException { + + } + + @Override + public long getFilePointer() { + return 0; + } + + @Override + public long getChecksum() throws IOException { + return 0; + } + + @Override + public void writeByte(byte b) throws IOException { + + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSearchDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSearchDirectoryFactory.java new file mode 100644 index 0000000000000..9c05958296c1d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSearchDirectoryFactory.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.directory; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.RemoteDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.utils.TransferManager; +import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.function.Supplier; + +import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; + +/** + * Factory for a remote search directory + * + * @opensearch.internal + */ +public class RemoteSearchDirectoryFactory implements IndexStorePlugin.DirectoryFactory { + + private final Supplier repositoriesService; + private final FileCache remoteStoreFileCache; + + public RemoteSearchDirectoryFactory(Supplier repositoriesService, + FileCache remoteStoreFileCache) { + this.repositoriesService = repositoriesService; + this.remoteStoreFileCache = remoteStoreFileCache; + } + + @Override + public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException { + String repositoryName = indexSettings.getRemoteStoreRepository(); + try (Repository repository = repositoriesService.get().repository(repositoryName)) { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + return createRemoteSearchDirectory(indexSettings, path, blobStoreRepository); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException("Repository should be created before creating index with remote_search enabled setting", e); + } + } + + private RemoteSearchDirectory createRemoteSearchDirectory( + IndexSettings indexSettings, + ShardPath localShardPath, + BlobStoreRepository blobStoreRepository + ) throws IOException { + BlobPath commonBlobPath = blobStoreRepository.basePath() + .add(indexSettings.getIndex().getUUID()) + .add(String.valueOf(localShardPath.getShardId().getId())) + .add("segments"); + + // these directories are initialized again as the composite directory implementation is not yet implemented + // and there is no way to pass the remote segment directory info to this directory + RemoteDirectory dataDirectory = createRemoteDirectory(blobStoreRepository, commonBlobPath, "data"); + RemoteDirectory metadataDirectory = createRemoteDirectory(blobStoreRepository, commonBlobPath, "metadata"); + + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + Map segmentsUploadedToRemoteStore = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore(); + + Path localStorePath = localShardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); + FSDirectory localStoreDir = FSDirectory.open(Files.createDirectories(localStorePath)); + // make sure directory is flushed to persistent storage + localStoreDir.syncMetaData(); + + final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("data")); + TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache); + return new RemoteSearchDirectory(segmentsUploadedToRemoteStore, localStoreDir, transferManager); + } + + private RemoteDirectory createRemoteDirectory(Repository repository, BlobPath commonBlobPath, String extention) { + BlobPath extendedPath = commonBlobPath.add(extention); + BlobContainer dataBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(extendedPath); + return new RemoteDirectory(dataBlobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSearchIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSearchIndexInput.java new file mode 100644 index 0000000000000..a5d144a0c3794 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSearchIndexInput.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.file; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.remote.utils.BlobFetchRequest; +import org.opensearch.index.store.remote.utils.TransferManager; + +import java.io.IOException; + +/** + * This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using uploaded segment metadata files. + *
+ * This class rely on {@link TransferManager} to really fetch the segment files from the remote blob store and maybe cache them + * + * @opensearch.internal + */ +public class OnDemandBlockSearchIndexInput extends OnDemandBlockIndexInput { + private static final Logger logger = LogManager.getLogger(OnDemandBlockSearchIndexInput.class); + + /** + * Where this class fetches IndexInput parts from + */ + final TransferManager transferManager; + + /** + * Uploaded metadata info for this IndexInput + */ + + protected final RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata; + + /** + * Underlying lucene directory to open blocks and for caching + */ + protected final FSDirectory directory; + /** + * File Name + */ + protected final String fileName; + + /** + * Size of the file, larger than length if it's a slice + */ + protected final long originalFileSize; + + public OnDemandBlockSearchIndexInput(RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata, FSDirectory directory, TransferManager transferManager) { + this( + "BlockedSearchIndexInput(path=\"" + + directory.getDirectory().toString() + + "/" + + uploadedSegmentMetadata.getOriginalFilename() + + "\", " + + "offset=" + + 0 + + ", length= " + + uploadedSegmentMetadata.getLength() + + ")", + uploadedSegmentMetadata, + 0L, + uploadedSegmentMetadata.getLength(), + false, + directory, + transferManager + ); + } + + public OnDemandBlockSearchIndexInput( + String resourceDescription, + RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata, + long offset, + long length, + boolean isClone, + FSDirectory directory, + TransferManager transferManager + ) { + this( + OnDemandBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length), + uploadedSegmentMetadata, + directory, + transferManager + ); + } + + OnDemandBlockSearchIndexInput( + Builder builder, + RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata, + FSDirectory directory, + TransferManager transferManager + ) { + super(builder); + this.transferManager = transferManager; + this.uploadedSegmentMetadata = uploadedSegmentMetadata; + this.fileName = uploadedSegmentMetadata.getOriginalFilename(); + this.directory = directory; + this.originalFileSize = uploadedSegmentMetadata.getLength(); + } + + @Override + protected OnDemandBlockSearchIndexInput buildSlice(String sliceDescription, long offset, long length) { + return new OnDemandBlockSearchIndexInput( + OnDemandBlockIndexInput.builder() + .blockSizeShift(blockSizeShift) + .isClone(true) + .offset(this.offset + offset) + .length(length) + .resourceDescription(sliceDescription), + uploadedSegmentMetadata, + directory, + transferManager + ); + } + + @Override + protected IndexInput fetchBlock(int blockId) throws IOException { + final String blockFileName = uploadedSegmentMetadata.getUploadedFilename() + "." + blockId; + + final long blockStart = getBlockStart(blockId); + final long blockEnd = blockStart + getActualBlockSize(blockId); + final long length = blockEnd - blockStart; + + BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder() + .position(blockStart) + .length(length) + .blobName(uploadedSegmentMetadata.getUploadedFilename()) + .directory(directory) + .fileName(blockFileName) + .build(); + return transferManager.fetchBlob(blobFetchRequest); + } + + @Override + public OnDemandBlockSearchIndexInput clone() { + OnDemandBlockSearchIndexInput clone = buildSlice("clone", 0L, this.length); + // ensures that clones may be positioned at the same point as the blocked file they were cloned from + clone.cloneBlock(this); + return clone; + } + + protected long getActualBlockSize(int blockId) { + return (blockId != getBlock(originalFileSize - 1)) ? blockSize : getBlockOffset(originalFileSize - 1) + 1; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index c9469283ee921..3eade788cfa71 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -84,7 +84,7 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo try { if (Files.exists(request.getFilePath()) == false) { try ( - InputStream snapshotFileInputStream = blobContainer.readBlob( + InputStream inputStream = blobContainer.readBlob( request.getBlobName(), request.getPosition(), request.getLength() @@ -92,7 +92,7 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) ) { - snapshotFileInputStream.transferTo(localFileOutputStream); + inputStream.transferTo(localFileOutputStream); } } final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 58a26f813d88d..a1d505575654a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -847,7 +847,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) { .filter(maybe -> Objects.requireNonNull(maybe).isPresent()) .collect(Collectors.toList()); if (engineFactories.isEmpty()) { - if (idxSettings.isRemoteSnapshot()) { + if (idxSettings.isRemoteSnapshot() || idxSettings.isRemoteIndex()) { return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false); } if (idxSettings.isSegRepEnabled()) { diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 5fb84a165c498..8333f744ad058 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -248,7 +248,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.syncSegmentsFromRemoteSegmentStore(false, false); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); - final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); + final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot() || indexShard.indexSettings().isRemoteIndex(); final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog || hasRemoteSegmentStore) == false; final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 7466ab5c1ca86..d94e7ce599907 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -359,7 +359,7 @@ public void cleanFiles( // their own commit points and therefore do not modify the commit user data // in their store. In these cases, reuse the primary's translog UUID. final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled() - || indexShard.indexSettings().isRemoteSnapshot(); + || indexShard.indexSettings().isRemoteSnapshot() || indexShard.indexSettings().isRemoteIndex(); if (reuseTranslogUUID) { final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); Translog.createEmptyTranslog(