Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate remote translog download on failover #5699

Merged
merged 13 commits into from
Jan 9, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Prevent deletion of snapshots that are backing searchable snapshot indexes ([#5069](https://github.com/opensearch-project/OpenSearch/pull/5069))
- Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229))
- Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603))
- Support request level durability for remote-backed indexes ([#5671](https://github.com/opensearch-project/OpenSearch/issues/5671))

### Dependencies
- Bumps `bcpg-fips` from 1.0.5.1 to 1.0.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
new InternalTranslogFactory(),
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.Version;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.TriFunction;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -493,7 +495,7 @@ public IndexService newIndexService(
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -549,7 +551,7 @@ public IndexService newIndexService(
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory,
repositoriesServiceSupplier
translogFactorySupplier
);
success = true;
return indexService;
Expand Down
20 changes: 5 additions & 15 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand All @@ -99,7 +97,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -117,6 +114,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -176,7 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -209,7 +207,7 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -281,7 +279,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.translogFactorySupplier = translogFactorySupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -524,14 +522,6 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary()
? new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
this.indexSettings.getRemoteStoreTranslogRepository()
)
: new InternalTranslogFactory();

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
Expand Down Expand Up @@ -562,7 +552,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
translogFactory,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -321,7 +322,7 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final TranslogFactory translogFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -344,7 +345,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand Down Expand Up @@ -431,7 +432,7 @@ public boolean shouldCache(Query query) {
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
this.translogFactorySupplier = translogFactorySupplier;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3420,7 +3421,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
isReadOnlyReplica,
translogFactory
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
return new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker,
fileTransferTracker::exclusionFilter
fileTransferTracker
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.translog.transfer.listener.FileTransferListener;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -32,8 +33,16 @@ public FileTransferTracker(ShardId shardId) {

@Override
public void onSuccess(TransferFileSnapshot fileSnapshot) {
TransferState targetState = TransferState.SUCCESS;
fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> {
add(fileSnapshot.getName(), TransferState.SUCCESS);
}

void add(String file, boolean success) {
TransferState targetState = success ? TransferState.SUCCESS : TransferState.FAILED;
add(file, targetState);
}

private void add(String file, TransferState targetState) {
fileTransferTracker.compute(file, (k, v) -> {
if (v == null || v.validateNextState(targetState)) {
return targetState;
}
Expand All @@ -43,13 +52,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {

@Override
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
TransferState targetState = TransferState.FAILED;
fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> {
if (v == null || v.validateNextState(targetState)) {
return targetState;
}
throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState);
});
add(fileSnapshot.getName(), TransferState.FAILED);
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
Expand Down Expand Up @@ -80,7 +83,7 @@ public boolean validateNextState(TransferState target) {
case FAILED:
return true;
case SUCCESS:
return Set.of(SUCCESS).contains(target);
return Objects.equals(SUCCESS, target);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;

import java.io.IOException;
Expand All @@ -33,7 +32,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand All @@ -48,9 +46,8 @@ public class TranslogTransferManager {

private final TransferService transferService;
private final BlobPath remoteBaseTransferPath;
private final BlobPath remoteMetadaTransferPath;
private final FileTransferListener fileTransferListener;
private final UnaryOperator<Set<TransferFileSnapshot>> exclusionFilter;
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

Expand All @@ -61,31 +58,29 @@ public class TranslogTransferManager {
public TranslogTransferManager(
TransferService transferService,
BlobPath remoteBaseTransferPath,
FileTransferListener fileTransferListener,
UnaryOperator<Set<TransferFileSnapshot>> exclusionFilter
FileTransferTracker fileTransferTracker
) {
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteMetadaTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferListener = fileTransferListener;
this.exclusionFilter = exclusionFilter;
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
}

public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener)
throws IOException {
List<Exception> exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount());
Set<TransferFileSnapshot> toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount());
try {
toUpload.addAll(exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
}
final CountDownLatch latch = new CountDownLatch(toUpload.size());
LatchedActionListener<TransferFileSnapshot> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(fileTransferListener::onSuccess, ex -> {
ActionListener.wrap(fileTransferTracker::onSuccess, ex -> {
assert ex instanceof FileTransferException;
logger.error(
() -> new ParameterizedMessage(
Expand All @@ -95,7 +90,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
ex
);
FileTransferException e = (FileTransferException) ex;
fileTransferListener.onFailure(e.getFileSnapshot(), ex);
fileTransferTracker.onFailure(e.getFileSnapshot(), ex);
exceptionList.add(ex);
}),
latch
Expand All @@ -119,7 +114,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
throw ex;
}
if (exceptionList.isEmpty()) {
transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadaTransferPath);
transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath);
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
} else {
Expand Down Expand Up @@ -160,14 +155,16 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) {
Files.copy(inputStream, filePath);
}
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadaTransferPath)
return transferService.listAll(remoteMetadataTransferPath)
.stream()
.max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)
.map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadaTransferPath, filename);) {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
} catch (IOException e) {
Expand All @@ -191,12 +188,10 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata();
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));
TransferFileSnapshot fileSnapshot = new TransferFileSnapshot(
return new TransferFileSnapshot(
translogTransferMetadata.getFileName(),
translogTransferMetadata.createMetadataBytes(),
translogTransferMetadata.getPrimaryTerm()
);

return fileSnapshot;
}
}
Loading