diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 268ba1a436393..35409437f605a 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -98,12 +98,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re * @param infos {@link SegmentInfos} infos * @throws IOException - When Refresh fails with an IOException. */ - public synchronized void updateSegments(SegmentInfos infos) throws IOException { + public void updateSegments(SegmentInfos infos) throws IOException { // roll over the currentInfo's generation, this ensures the on-disk gen // is always increased. infos.updateGeneration(currentInfos); currentInfos = infos; - maybeRefresh(); + maybeRefreshBlocking(); } public SegmentInfos getSegmentInfos() { diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java new file mode 100644 index 0000000000000..98f1a416731e4 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationReaderManagerTests.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.util.Version; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.index.store.Store; + +import java.io.IOException; + +public class NRTReplicationReaderManagerTests extends EngineTestCase { + + public void testCreateNRTreaderManager() throws IOException { + try (final Store store = createStore()) { + store.createEmpty(Version.LATEST); + final DirectoryReader reader = DirectoryReader.open(store.directory()); + final SegmentInfos initialInfos = ((StandardDirectoryReader) reader).getSegmentInfos(); + NRTReplicationReaderManager readerManager = new NRTReplicationReaderManager( + OpenSearchDirectoryReader.wrap(reader, shardId), + (files) -> {}, + (files) -> {} + ); + assertEquals(initialInfos, readerManager.getSegmentInfos()); + try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) { + assertNull(readerManager.refreshIfNeeded(acquire)); + } + + // create an updated infos + final SegmentInfos infos_2 = readerManager.getSegmentInfos().clone(); + infos_2.changed(); + + readerManager.updateSegments(infos_2); + assertEquals(infos_2, readerManager.getSegmentInfos()); + try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) { + final StandardDirectoryReader standardReader = NRTReplicationReaderManager.unwrapStandardReader(acquire); + assertEquals(infos_2, standardReader.getSegmentInfos()); + } + } + } + + public void testUpdateSegmentsWhileRefreshing() throws IOException, InterruptedException { + try (final Store store = createStore()) { + store.createEmpty(Version.LATEST); + final DirectoryReader reader = DirectoryReader.open(store.directory()); + NRTReplicationReaderManager readerManager = new NRTReplicationReaderManager( + OpenSearchDirectoryReader.wrap(reader, shardId), + (files) -> {}, + (files) -> {} + ); + + final SegmentInfos infos_2 = readerManager.getSegmentInfos().clone(); + infos_2.changed(); + + Thread refreshThread = new Thread(() -> { + try { + readerManager.maybeRefresh(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + Thread updateThread = new Thread(() -> { + try { + readerManager.updateSegments(infos_2); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + refreshThread.start(); + updateThread.start(); + refreshThread.join(); + updateThread.join(); + try (final OpenSearchDirectoryReader acquire = readerManager.acquire()) { + final StandardDirectoryReader standardReader = NRTReplicationReaderManager.unwrapStandardReader(acquire); + assertEquals(infos_2.version, standardReader.getSegmentInfos().version); + } + assertEquals(infos_2, readerManager.getSegmentInfos()); + } + } +}