Skip to content

Commit

Permalink
[Segment Replication] Prevent store clean up on reader close action
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jul 6, 2023
1 parent 4657fe7 commit 386cb45
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -272,6 +274,106 @@ public void testIndexReopenClose() throws Exception {
verifyStoreContent();
}

public void testConcurrentIndexAndSearch() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
final List<ActionFuture<SearchResponse>> pendingSearchResponse = new ArrayList<>();
final int searchCount = randomIntBetween(100, 200);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

for (int i = 0; i < searchCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
flush(INDEX_NAME);
forceMerge();
}

final SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
.setIndices(INDEX_NAME)
.setRequestCache(false)
.setScroll(TimeValue.timeValueDays(1))
.setSize(10)
.get();

for (int i = searchCount; i < searchCount * 2; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
}
flush(INDEX_NAME);
forceMerge();
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
logger.info("--> Cluster state {}", client().admin().cluster().prepareState().execute().actionGet().getState());
verifyStoreContent();
}

public void testScrollWithConcurrentIndexAndSearch() throws Exception {
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
final List<ActionFuture<SearchResponse>> pendingSearchResponse = new ArrayList<>();
final int searchCount = randomIntBetween(100, 200);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

for (int i = 0; i < searchCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
flush(INDEX_NAME);
forceMerge();
}

final SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
.setIndices(INDEX_NAME)
.setRequestCache(false)
.setScroll(TimeValue.timeValueDays(1))
.setSize(10)
.get();

for (int i = searchCount; i < searchCount * 2; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
}
flush(INDEX_NAME);
forceMerge();
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
verifyStoreContent();
}

public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException {
(files) -> {
store.decRefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint(
"On reader closed",
getLatestSegmentInfos(),
getLastCommittedSegmentInfos(),
false
);
store.cleanupUnReferencedFiles("On reader closed", files);
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
Expand All @@ -147,9 +142,9 @@ public TranslogManager translogManager() {
}

public synchronized void updateSegments(final SegmentInfos infos) throws IOException {
// Update the current infos reference on the Engine's reader.
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
// Update the current infos reference on the Engine's reader.
ensureOpen();
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);
Expand Down
68 changes: 68 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
Expand All @@ -64,6 +65,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -837,6 +839,25 @@ public void cleanupAndPreserveLatestCommitPoint(
}
}

/**
* Segment Replication method
*
* Performs cleanup of un-referenced files intended to be used reader release action
*
* @param reason Reason for cleanup
* @param filesToConsider Files to consider for clean up
* @throws IOException Exception from cleanup operation
*/
public void cleanupUnReferencedFiles(String reason, Collection<String> filesToConsider) throws IOException {
assert indexSettings.isSegRepEnabled();
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, null, filesToConsider, false);
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(
String reason,
Collection<String> localSnapshot,
Expand Down Expand Up @@ -871,6 +892,53 @@ private void cleanupFiles(
}
}

/**
* Used for segment replication method
*
* This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos
* bytes to ensure they are not deleted.
*
* @param tmpToFileName Map of temporary replication file to actual file name
* @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file
* @param segmentsGen segment generation number
* @param consumer consumer for generated SegmentInfos
* @throws IOException Exception while reading store and building segment infos
*/
public void buildInfosFromStore(
Map<String, String> tmpToFileName,
byte[] infosBytes,
long segmentsGen,
CheckedConsumer<SegmentInfos, IOException> consumer
) throws IOException {
metadataLock.writeLock().lock();
try {
final List<String> values = new ArrayList<>(tmpToFileName.values());
incRefFileDeleter(values);
try {
renameTempFilesSafe(tmpToFileName);
consumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
} finally {
decRefFileDeleter(values);
}
} finally {
metadataLock.writeLock().unlock();
}
}

private SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException {
try (final ChecksumIndexInput input = toIndexInput(infosBytes)) {
return SegmentInfos.readCommit(directory, input, segmentsGen);
}
}

/**
* This method formats byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be
* passed to SegmentInfos.readCommit
*/
private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(new ByteArrayIndexInput("Snapshot of SegmentInfos", input));
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}

public Map<String, String> getTempFileNames() {
return tempFileNames;
}

public IndexOutput getOpenIndexOutput(String key) {
ensureOpen.run();
return openIndexOutputs.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
Expand Down Expand Up @@ -221,18 +222,15 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
Store store = null;
try {
multiFileWriter.renameAllTempFiles();
store = store();
store.incRef();
// Deserialize the new SegmentInfos object sent from the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();
SegmentInfos infos = SegmentInfos.readCommit(
store.directory(),
toIndexInput(checkpointInfoResponse.getInfosBytes()),
responseCheckpoint.getSegmentsGen()
CheckedConsumer<SegmentInfos, IOException> finalizeReplication = indexShard::finalizeReplication;
store.buildInfosFromStore(
multiFileWriter.getTempFileNames(),
checkpointInfoResponse.getInfosBytes(),
checkpointInfoResponse.getCheckpoint().getSegmentsGen(),
finalizeReplication
);
cancellableThreads.checkForCancel();
indexShard.finalizeReplication(infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,38 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
}
}

public void testSimultaneousEngineCloseAndCommit() throws IOException, InterruptedException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
CountDownLatch latch = new CountDownLatch(1);
Thread commitThread = new Thread(() -> {
try {
nrtEngine.updateSegments(store.readLastCommittedSegmentsInfo());
latch.countDown();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Thread closeThread = new Thread(() -> {
try {
latch.await();
nrtEngine.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
commitThread.start();
closeThread.start();
commitThread.join();
closeThread.join();
}
}

public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Expand Down
Loading

0 comments on commit 386cb45

Please sign in to comment.