Skip to content

Commit

Permalink
[Segment Replication] Add PIT/Scroll compatibility with Segment Repli…
Browse files Browse the repository at this point in the history
…cation #6644 (#6765)

* Segment Replication - PIT/Scroll compatibility.

This change makes updates to make PIT/Scroll queries compatibile with Segment Replication.
It does this by refcounting files when a new reader is created, and discarding those files after
a reader is closed.

Signed-off-by: Marc Handalian <[email protected]>

* Fix broken test.

Signed-off-by: Marc Handalian <[email protected]>

* Fix test bug with PIT where snapshotted segments are queried instead of current store state.

Signed-off-by: Marc Handalian <[email protected]>

* Address review comments and prevent temp file deletion during reader close

Signed-off-by: Suraj Singh <[email protected]>

* Fix precommit failure

Signed-off-by: Suraj Singh <[email protected]>

* Use last committed segment infos reference from replication engine

Signed-off-by: Suraj Singh <[email protected]>

* Clean up and prevent incref on segment info file copied from primary

Signed-off-by: Suraj Singh <[email protected]>

* Fix failing test

Signed-off-by: Suraj Singh <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
Signed-off-by: Suraj Singh <[email protected]>
Co-authored-by: Marc Handalian <[email protected]>
  • Loading branch information
dreamer-89 and mch2 authored Apr 5, 2023
1 parent 65d9d02 commit 4b7cb02
Show file tree
Hide file tree
Showing 7 changed files with 529 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
Expand Down Expand Up @@ -119,6 +119,28 @@ public void onAfterTranslogSync() {
}
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
(files) -> {
store.decRefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint(
"On reader closed",
getLatestSegmentInfos(),
getLastCommittedSegmentInfos(),
false
);
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
}
);
}

@Override
public TranslogManager translogManager() {
return translogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
Expand All @@ -35,17 +37,27 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {

private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
private volatile SegmentInfos currentInfos;
private Consumer<Collection<String>> onReaderClosed;
private Consumer<Collection<String>> onNewReader;

/**
* Creates and returns a new SegmentReplicationReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the SegmentReplicationReaderManager to use for future reopens
* @param reader - The SegmentReplicationReaderManager to use for future reopens.
* @param onNewReader - Called when a new reader is created.
* @param onReaderClosed - Called when a reader is closed.
*/
NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
NRTReplicationReaderManager(
OpenSearchDirectoryReader reader,
Consumer<Collection<String>> onNewReader,
Consumer<Collection<String>> onReaderClosed
) {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
this.onNewReader = onNewReader;
this.onReaderClosed = onReaderClosed;
}

@Override
Expand All @@ -60,6 +72,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
final Collection<String> files = currentInfos.files(false);
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Expand All @@ -68,7 +81,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap(
softDeletesDirectoryReaderWrapper,
referenceToRefresh.shardId()
);
onNewReader.accept(files);
OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files));
return reader;
}

/**
Expand All @@ -89,7 +108,7 @@ public SegmentInfos getSegmentInfos() {
return currentInfos;
}

private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
final DirectoryReader delegate = reader.getDelegate();
if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* This class is a version of Lucene's ReplicaFileDeleter class used to keep track of
* segment files that should be preserved on replicas between replication events.
* The difference is this component does not actually perform any deletions, it only handles refcounts.
* Our deletions are made through Store.java.
*
* https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
*
* @opensearch.internal
*/
final class ReplicaFileTracker {

private final Map<String, Integer> refCounts = new HashMap<>();

public synchronized void incRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
refCounts.merge(fileName, 1, Integer::sum);
}
}

public synchronized void decRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
Integer curCount = refCounts.get(fileName);
assert curCount != null : "fileName=" + fileName;
assert curCount > 0;
if (curCount == 1) {
refCounts.remove(fileName);
} else {
refCounts.put(fileName, curCount - 1);
}
}
}

public synchronized boolean canDelete(String fileName) {
return refCounts.containsKey(fileName) == false;
}
}
64 changes: 59 additions & 5 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;
import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;

/**
* A Store provides plain access to files written by an opensearch index shard. Each shard
Expand Down Expand Up @@ -182,6 +183,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ShardLock shardLock;
private final OnClose onClose;

// used to ref count files when a new Reader is opened for PIT/Scroll queries
// prevents segment files deletion until the PIT/Scroll expires or is discarded
private final ReplicaFileTracker replicaFileTracker;

private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
protected void closeInternal() {
Expand All @@ -202,6 +207,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;
this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null;

assert onClose != null;
assert shardLock != null;
Expand Down Expand Up @@ -782,34 +788,70 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}

/**
* Segment Replication method -
* Segment Replication method
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
* part of the latest on-disk commit point.
*
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true);
}

/**
* Segment Replication method
*
* Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup
*
* This method deletes every file in this store. Except
* 1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary
* 2. Files part of the passed in segment infos, typically the last committed segment info
* 3. Files incremented by active reader for pit/scroll queries
* 4. Temporary replication file if passed in deleteTempFiles is true.
*
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos
* @param deleteTempFiles Does this clean up delete temporary replication files
*
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(
String reason,
SegmentInfos infos,
SegmentInfos lastCommittedSegmentInfos,
boolean deleteTempFiles
) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
cleanupFiles(reason, getMetadata(lastCommittedSegmentInfos), infos.files(true), deleteTempFiles);
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
throws IOException {
private void cleanupFiles(
String reason,
MetadataSnapshot localSnapshot,
@Nullable Collection<String> additionalFiles,
boolean deleteTempFiles
) throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalFiles != null && additionalFiles.contains(existingFile))) {
|| (additionalFiles != null && additionalFiles.contains(existingFile))
// also ensure we are not deleting a file referenced by an active reader.
|| replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false
// prevent temporary file deletion during reader cleanup
|| deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down Expand Up @@ -1909,4 +1951,16 @@ private static IndexWriterConfig newIndexWriterConfig() {
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE);
}

public void incRefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileTracker.incRef(files);
}
}

public void decRefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileTracker.decRef(files);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class SegmentReplicationTarget extends ReplicationTarget {
private final SegmentReplicationState state;
protected final MultiFileWriter multiFileWriter;

public final static String REPLICATION_PREFIX = "replication.";

public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}
Expand Down Expand Up @@ -85,7 +87,7 @@ protected void closeInternal() {

@Override
protected String getPrefix() {
return "replication." + UUIDs.randomBase64UUID() + ".";
return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + ".";
}

@Override
Expand Down

0 comments on commit 4b7cb02

Please sign in to comment.