Skip to content

Commit

Permalink
Integrate remote translog download on failover
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jan 4, 2023
1 parent 802c5d4 commit 3276c06
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
new InternalTranslogFactory(),
(idxSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
20 changes: 12 additions & 8 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -524,13 +525,16 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary()
? new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
this.indexSettings.getRemoteStoreTranslogRepository()
)
: new InternalTranslogFactory();
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier = (idxSettings, shardRouting) -> {
if (idxSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
idxSettings.getRemoteStoreTranslogRepository()
);
}
return new InternalTranslogFactory();
};

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
Expand Down Expand Up @@ -562,7 +566,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
translogFactory,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -319,7 +320,7 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final TranslogFactory translogFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -342,7 +343,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand Down Expand Up @@ -429,7 +430,7 @@ public boolean shouldCache(Query query) {
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
this.translogFactorySupplier = translogFactorySupplier;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3352,7 +3353,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false,
translogFactory
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Reposito
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog(), fileTransferTracker);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public RemoteFsTranslog(
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);

try {
download(translogTransferManager, location);
download(translogTransferManager, location, fileTransferTracker);
Checkpoint checkpoint = readCheckpoint(location);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
Expand Down Expand Up @@ -96,7 +96,8 @@ public RemoteFsTranslog(
}
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
public static void download(TranslogTransferManager translogTransferManager, Path location, FileTransferTracker fileTransferTracker)
throws IOException {

TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
Expand All @@ -118,6 +119,11 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())),
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);

// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
for (Path file : FileSystemUtils.files(location)) {
fileTransferTracker.add(file.getFileName().toString());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {
});
}

public void add(String file) {
fileTransferTracker.put(file, TransferState.SUCCESS);
}

@Override
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
TransferState targetState = TransferState.FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ protected IndexShard newShard(
globalCheckpointSyncer,
retentionLeaseSyncer,
breakerService,
new InternalTranslogFactory(),
(idxSettings, shardRouting) -> new InternalTranslogFactory(),
checkpointPublisher,
remoteStore
);
Expand Down

0 comments on commit 3276c06

Please sign in to comment.