Skip to content

Commit

Permalink
Remove explicit fsync when every file is copied.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 17, 2022
1 parent fc7ca0b commit a48d1f3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,6 @@ public void updateCurrentInfos(byte[] infosBytes, long gen) throws IOException {
toIndexInput(infosBytes),
gen);
assert gen == infos.getGeneration();
if (gen > getLatestSegmentInfos().getGeneration()) {
this.store.syncDirectory();
}
externalReaderManager.internalReaderManager.setCurrentInfos(infos);
externalReaderManager.maybeRefresh();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ temporaryFileName
+ "] in "
+ Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
// TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync
// when a new checkpoint is received.
// store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.Directory;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -187,7 +189,13 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo,
} else {
assert indexShard.assertRetentionLeasesPersisted();
}
indexShard.updateCurrentInfos(checkpointInfo.getCheckpoint().getSegmentsGen(), checkpointInfo.getInfosBytes());
final long segmentsGen = checkpointInfo.getCheckpoint().getSegmentsGen();
// force an fsync if we are receiving a new gen.
if (segmentsGen > indexShard.getLatestSegmentInfos().getGeneration()) {
final Directory directory = store().directory();
directory.sync(Arrays.asList(directory.listAll()));
}
indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes());
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down

0 comments on commit a48d1f3

Please sign in to comment.