diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0041b6df93708..4fbb5fc1bac2c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
 - Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
 - Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
+- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
 
 ### Dependencies
 - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
index 59c1c45ccd3ea..59713cf0642f6 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
@@ -17,8 +17,21 @@
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.StandardDirectoryReader;
 import org.apache.lucene.tests.util.TestUtil;
 import org.apache.lucene.util.BytesRef;
+import org.opensearch.action.ActionFuture;
+import org.opensearch.action.admin.indices.flush.FlushRequest;
+import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.opensearch.action.search.CreatePitAction;
+import org.opensearch.action.search.CreatePitRequest;
+import org.opensearch.action.search.CreatePitResponse;
+import org.opensearch.action.search.DeletePitAction;
+import org.opensearch.action.search.DeletePitRequest;
+import org.opensearch.action.search.PitTestsUtil;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.action.search.SearchType;
 import org.opensearch.action.support.WriteRequest;
 import org.opensearch.action.update.UpdateResponse;
 import org.opensearch.client.Requests;
@@ -29,15 +42,24 @@
 import org.opensearch.cluster.routing.ShardRouting;
 import org.opensearch.cluster.routing.ShardRoutingState;
 import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
+import org.opensearch.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
 import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
 import org.opensearch.index.IndexModule;
 import org.opensearch.index.SegmentReplicationPerGroupStats;
 import org.opensearch.index.SegmentReplicationPressureService;
 import org.opensearch.index.SegmentReplicationShardStats;
+import org.opensearch.index.engine.Engine;
+import org.opensearch.index.engine.NRTReplicationReaderManager;
 import org.opensearch.index.shard.IndexShard;
 import org.opensearch.index.shard.ShardId;
 import org.opensearch.indices.recovery.FileChunkRequest;
 import org.opensearch.indices.replication.common.ReplicationType;
+import org.opensearch.search.SearchService;
+import org.opensearch.search.builder.PointInTimeBuilder;
+import org.opensearch.search.internal.PitReaderContext;
+import org.opensearch.search.sort.SortOrder;
 import org.opensearch.node.NodeClosedException;
 import org.opensearch.test.BackgroundIndexer;
 import org.opensearch.test.InternalTestCluster;
@@ -46,14 +68,23 @@
 import org.opensearch.transport.TransportService;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static org.opensearch.action.search.PitTestsUtil.assertSegments;
+import static org.opensearch.action.search.SearchContextId.decode;
+import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
 import static org.opensearch.index.query.QueryBuilders.matchQuery;
+import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;
 import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
 import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
 import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;
 
@@ -836,4 +867,342 @@ public void testPressureServiceStats() throws Exception {
             });
         }
     }
+
+    /**
+     * Tests a scroll query on the replica
+     * @throws Exception
+     */
+    public void testScrollCreatedOnReplica() throws Exception {
+        // create the cluster with one primary node containing primary shard and replica node containing replica shard
+        final String primary = internalCluster().startNode();
+        createIndex(INDEX_NAME);
+        ensureYellowAndNoInitializingShards(INDEX_NAME);
+        final String replica = internalCluster().startNode();
+        ensureGreen(INDEX_NAME);
+
+        // index 100 docs
+        for (int i = 0; i < 100; i++) {
+            client().prepareIndex(INDEX_NAME)
+                .setId(String.valueOf(i))
+                .setSource(jsonBuilder().startObject().field("field", i).endObject())
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+            refresh(INDEX_NAME);
+        }
+        assertBusy(
+            () -> assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            )
+        );
+        final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
+        final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get();
+        final Collection<String> snapshottedSegments = segmentInfos.files(false);
+        // opens a scrolled query before a flush is called.
+        // this is for testing scroll segment consistency between refresh and flush
+        SearchResponse searchResponse = client(replica).prepareSearch()
+            .setQuery(matchAllQuery())
+            .setIndices(INDEX_NAME)
+            .setRequestCache(false)
+            .setPreference("_only_local")
+            .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
+            .addSort("field", SortOrder.ASC)
+            .setSize(10)
+            .setScroll(TimeValue.timeValueDays(1))
+            .get();
+
+        // force call flush
+        flush(INDEX_NAME);
+
+        for (int i = 3; i < 50; i++) {
+            client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
+            refresh(INDEX_NAME);
+            if (randomBoolean()) {
+                client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
+                flush(INDEX_NAME);
+            }
+        }
+        assertBusy(() -> {
+            assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            );
+        });
+
+        client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
+        assertBusy(() -> {
+            assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            );
+        });
+        // Test stats
+        logger.info("--> Collect all scroll query hits");
+        long scrollHits = 0;
+        do {
+            scrollHits += searchResponse.getHits().getHits().length;
+            searchResponse = client(replica).prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueDays(1)).get();
+            assertAllSuccessful(searchResponse);
+        } while (searchResponse.getHits().getHits().length > 0);
+
+        List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
+        assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
+
+        client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
+
+        currentFiles = List.of(replicaShard.store().directory().listAll());
+        assertFalse("Files should be cleaned up post scroll clear request", currentFiles.containsAll(snapshottedSegments));
+        assertEquals(100, scrollHits);
+    }
+
+    /**
+     * Tests that when scroll query is cleared, it does not delete the temporary replication files, which are part of
+     * ongoing round of segment replication
+     *
+     * @throws Exception
+     */
+    public void testScrollWithOngoingSegmentReplication() throws Exception {
+        // create the cluster with one primary node containing primary shard and replica node containing replica shard
+        final String primary = internalCluster().startNode();
+        prepareCreate(
+            INDEX_NAME,
+            Settings.builder()
+                // we want to control refreshes
+                .put("index.refresh_interval", -1)
+        ).get();
+        ensureYellowAndNoInitializingShards(INDEX_NAME);
+        final String replica = internalCluster().startNode();
+        ensureGreen(INDEX_NAME);
+
+        final int initialDocCount = 10;
+        final int finalDocCount = 20;
+        for (int i = 0; i < initialDocCount; i++) {
+            client().prepareIndex(INDEX_NAME)
+                .setId(String.valueOf(i))
+                .setSource(jsonBuilder().startObject().field("field", i).endObject())
+                .get();
+        }
+        // catch up replica with primary
+        refresh(INDEX_NAME);
+        assertBusy(
+            () -> assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            )
+        );
+        logger.info("--> Create scroll query");
+        // opens a scrolled query before a flush is called.
+        SearchResponse searchResponse = client(replica).prepareSearch()
+            .setQuery(matchAllQuery())
+            .setIndices(INDEX_NAME)
+            .setRequestCache(false)
+            .setPreference("_only_local")
+            .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
+            .addSort("field", SortOrder.ASC)
+            .setSize(10)
+            .setScroll(TimeValue.timeValueDays(1))
+            .get();
+
+        // force call flush
+        flush(INDEX_NAME);
+
+        // Index more documents
+        for (int i = initialDocCount; i < finalDocCount; i++) {
+            client().prepareIndex(INDEX_NAME)
+                .setId(String.valueOf(i))
+                .setSource(jsonBuilder().startObject().field("field", i).endObject())
+                .get();
+        }
+        // Block file copy operation to ensure replica has few temporary replication files
+        CountDownLatch blockFileCopy = new CountDownLatch(1);
+        CountDownLatch waitForFileCopy = new CountDownLatch(1);
+        MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
+            TransportService.class,
+            primary
+        ));
+        primaryTransportService.addSendBehavior(
+            internalCluster().getInstance(TransportService.class, replica),
+            (connection, requestId, action, request, options) -> {
+                if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
+                    FileChunkRequest req = (FileChunkRequest) request;
+                    logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
+                    if (req.name().endsWith("cfs") && req.lastChunk()) {
+                        try {
+                            waitForFileCopy.countDown();
+                            logger.info("--> Waiting for file copy");
+                            blockFileCopy.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+                connection.sendRequest(requestId, action, request, options);
+            }
+        );
+
+        // perform refresh to start round of segment replication
+        refresh(INDEX_NAME);
+
+        // wait for segrep to start and copy temporary files
+        waitForFileCopy.await();
+
+        // verify replica contains temporary files
+        IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
+        List<String> temporaryFiles = Arrays.stream(replicaShard.store().directory().listAll())
+            .filter(fileName -> fileName.startsWith(REPLICATION_PREFIX))
+            .collect(Collectors.toList());
+        logger.info("--> temporaryFiles {}", temporaryFiles);
+        assertTrue(temporaryFiles.size() > 0);
+
+        // Clear scroll query, this should clean up files on replica
+        client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
+
+        // verify temporary files still exist
+        replicaShard = getIndexShard(replica, INDEX_NAME);
+        List<String> temporaryFilesPostClear = Arrays.stream(replicaShard.store().directory().listAll())
+            .filter(fileName -> fileName.startsWith(REPLICATION_PREFIX))
+            .collect(Collectors.toList());
+        logger.info("--> temporaryFilesPostClear {}", temporaryFilesPostClear);
+
+        // Unblock segment replication
+        blockFileCopy.countDown();
+
+        assertEquals(temporaryFiles.size(), temporaryFilesPostClear.size());
+        assertTrue(temporaryFilesPostClear.containsAll(temporaryFiles));
+
+        // wait for replica to catch up and verify doc count
+        assertBusy(() -> {
+            assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            );
+        });
+        verifyStoreContent();
+        waitForSearchableDocs(finalDocCount, primary, replica);
+    }
+
+    public void testPitCreatedOnReplica() throws Exception {
+        final String primary = internalCluster().startNode();
+        createIndex(INDEX_NAME);
+        ensureYellowAndNoInitializingShards(INDEX_NAME);
+        final String replica = internalCluster().startNode();
+        ensureGreen(INDEX_NAME);
+        client().prepareIndex(INDEX_NAME)
+            .setId("1")
+            .setSource("foo", randomInt())
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        refresh(INDEX_NAME);
+
+        client().prepareIndex(INDEX_NAME)
+            .setId("2")
+            .setSource("foo", randomInt())
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        for (int i = 3; i < 100; i++) {
+            client().prepareIndex(INDEX_NAME)
+                .setId(String.valueOf(i))
+                .setSource("foo", randomInt())
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+            refresh(INDEX_NAME);
+        }
+        // wait until replication finishes, then make the pit request.
+        assertBusy(
+            () -> assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            )
+        );
+        CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
+        request.setPreference("_only_local");
+        request.setIndices(new String[] { INDEX_NAME });
+        ActionFuture<CreatePitResponse> execute = client(replica).execute(CreatePitAction.INSTANCE, request);
+        CreatePitResponse pitResponse = execute.get();
+        SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME)
+            .setSize(10)
+            .setPreference("_only_local")
+            .setRequestCache(false)
+            .addSort("foo", SortOrder.ASC)
+            .searchAfter(new Object[] { 30 })
+            .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
+            .get();
+        assertEquals(1, searchResponse.getSuccessfulShards());
+        assertEquals(1, searchResponse.getTotalShards());
+        FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME);
+        client().admin().indices().flush(flushRequest).get();
+        final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
+
+        // fetch the segments snapshotted when the reader context was created.
+        Collection<String> snapshottedSegments;
+        SearchService searchService = internalCluster().getInstance(SearchService.class, replica);
+        NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica);
+        final PitReaderContext pitReaderContext = searchService.getPitReaderContext(
+            decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId()
+        );
+        try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) {
+            final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader(
+                (OpenSearchDirectoryReader) searcher.getDirectoryReader()
+            );
+            final SegmentInfos infos = standardDirectoryReader.getSegmentInfos();
+            snapshottedSegments = infos.files(false);
+        }
+
+        flush(INDEX_NAME);
+        for (int i = 101; i < 200; i++) {
+            client().prepareIndex(INDEX_NAME)
+                .setId(String.valueOf(i))
+                .setSource("foo", randomInt())
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+            refresh(INDEX_NAME);
+            if (randomBoolean()) {
+                client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
+                flush(INDEX_NAME);
+            }
+        }
+        assertBusy(() -> {
+            assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            );
+        });
+
+        client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get();
+        assertBusy(() -> {
+            assertEquals(
+                getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(),
+                getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
+            );
+        });
+        // Test stats
+        IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
+        indicesStatsRequest.indices(INDEX_NAME);
+        indicesStatsRequest.all();
+        IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
+        long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent();
+        long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts();
+        assertEquals(1, pitCurrent);
+        assertEquals(1, openContexts);
+        SearchResponse resp = client(replica).prepareSearch(INDEX_NAME)
+            .setSize(10)
+            .setPreference("_only_local")
+            .addSort("foo", SortOrder.ASC)
+            .searchAfter(new Object[] { 30 })
+            .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
+            .setRequestCache(false)
+            .get();
+        PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime());
+        assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId());
+
+        List<String> currentFiles = List.of(replicaShard.store().directory().listAll());
+        assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments));
+
+        // delete the PIT
+        DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId());
+        client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet();
+
+        currentFiles = List.of(replicaShard.store().directory().listAll());
+        assertFalse("Files should be cleaned up", currentFiles.containsAll(snapshottedSegments));
+    }
 }
diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
index 0ff0eeba7d394..6ba8e5d893bc0 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
@@ -162,7 +162,7 @@ private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
 
     /**
      * When primary shards balance is desired, enable primary shard balancing constraints
-     * @param preferPrimaryShardBalance
+     * @param preferPrimaryShardBalance boolean to prefer balancing by primary shard
      */
     private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
         this.preferPrimaryShardBalance = preferPrimaryShardBalance;
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
index 8071e94d1426d..da3f914d8bd7e 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
@@ -70,7 +70,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
         WriteOnlyTranslogManager translogManagerRef = null;
         try {
             lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
-            readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
+            readerManager = buildReaderManager();
             final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
                 this.lastCommittedSegmentInfos.getUserData().entrySet()
             );
@@ -121,6 +121,28 @@ public void onAfterTranslogSync() {
         }
     }
 
+    private NRTReplicationReaderManager buildReaderManager() throws IOException {
+        return new NRTReplicationReaderManager(
+            OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
+            store::incRefFileDeleter,
+            (files) -> {
+                store.decRefFileDeleter(files);
+                try {
+                    store.cleanupAndPreserveLatestCommitPoint(
+                        "On reader closed",
+                        getLatestSegmentInfos(),
+                        getLastCommittedSegmentInfos(),
+                        false
+                    );
+                } catch (IOException e) {
+                    // Log but do not rethrow - we can try cleaning up again after next replication cycle.
+                    // If that were to fail, the shard will as well.
+                    logger.error("Unable to clean store after reader closed", e);
+                }
+            }
+        );
+    }
+
     public TranslogManager translogManager() {
         return translogManager;
     }
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java
index 00748acb1d76d..9ec484ebfd383 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java
@@ -22,8 +22,10 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Consumer;
 
 /**
  * This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
@@ -35,17 +37,27 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
 
     private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
     private volatile SegmentInfos currentInfos;
+    private Consumer<Collection<String>> onReaderClosed;
+    private Consumer<Collection<String>> onNewReader;
 
     /**
      * Creates and returns a new SegmentReplicationReaderManager from the given
      * already-opened {@link OpenSearchDirectoryReader}, stealing
      * the incoming reference.
      *
-     * @param reader the SegmentReplicationReaderManager to use for future reopens
+     * @param reader         - The SegmentReplicationReaderManager to use for future reopens.
+     * @param onNewReader    - Called when a new reader is created.
+     * @param onReaderClosed - Called when a reader is closed.
      */
-    NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
+    NRTReplicationReaderManager(
+        OpenSearchDirectoryReader reader,
+        Consumer<Collection<String>> onNewReader,
+        Consumer<Collection<String>> onReaderClosed
+    ) {
         super(reader);
         currentInfos = unwrapStandardReader(reader).getSegmentInfos();
+        this.onNewReader = onNewReader;
+        this.onReaderClosed = onReaderClosed;
     }
 
     @Override
@@ -60,6 +72,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
         for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
             subs.add(ctx.reader());
         }
+        final Collection<String> files = currentInfos.files(false);
         DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
         final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
             innerReader,
@@ -68,7 +81,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
         logger.trace(
             () -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
         );
-        return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
+        final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap(
+            softDeletesDirectoryReaderWrapper,
+            referenceToRefresh.shardId()
+        );
+        onNewReader.accept(files);
+        OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files));
+        return reader;
     }
 
     /**
@@ -89,7 +108,7 @@ public SegmentInfos getSegmentInfos() {
         return currentInfos;
     }
 
-    private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
+    public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
         final DirectoryReader delegate = reader.getDelegate();
         if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
             return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
diff --git a/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java
new file mode 100644
index 0000000000000..0ec282619337c
--- /dev/null
+++ b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java
@@ -0,0 +1,51 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.store;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class is a version of Lucene's ReplicaFileDeleter class used to keep track of
+ * segment files that should be preserved on replicas between replication events.
+ * The difference is this component does not actually perform any deletions, it only handles refcounts.
+ * Our deletions are made through Store.java.
+ *
+ * https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
+ *
+ * @opensearch.internal
+ */
+final class ReplicaFileTracker {
+
+    private final Map<String, Integer> refCounts = new HashMap<>();
+
+    public synchronized void incRef(Collection<String> fileNames) {
+        for (String fileName : fileNames) {
+            refCounts.merge(fileName, 1, Integer::sum);
+        }
+    }
+
+    public synchronized void decRef(Collection<String> fileNames) {
+        for (String fileName : fileNames) {
+            Integer curCount = refCounts.get(fileName);
+            assert curCount != null : "fileName=" + fileName;
+            assert curCount > 0;
+            if (curCount == 1) {
+                refCounts.remove(fileName);
+            } else {
+                refCounts.put(fileName, curCount - 1);
+            }
+        }
+    }
+
+    public synchronized boolean canDelete(String fileName) {
+        return refCounts.containsKey(fileName) == false;
+    }
+}
diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java
index 69de85cd23820..f923532b3d9ad 100644
--- a/server/src/main/java/org/opensearch/index/store/Store.java
+++ b/server/src/main/java/org/opensearch/index/store/Store.java
@@ -124,6 +124,7 @@
 import static java.util.Collections.unmodifiableMap;
 import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
 import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;
+import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;
 
 /**
  * A Store provides plain access to files written by an opensearch index shard. Each shard
@@ -182,6 +183,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
     private final ShardLock shardLock;
     private final OnClose onClose;
 
+    // used to ref count files when a new Reader is opened for PIT/Scroll queries
+    // prevents segment files deletion until the PIT/Scroll expires or is discarded
+    private final ReplicaFileTracker replicaFileTracker;
+
     private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
         @Override
         protected void closeInternal() {
@@ -202,6 +207,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
         this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
         this.shardLock = shardLock;
         this.onClose = onClose;
+        this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null;
 
         assert onClose != null;
         assert shardLock != null;
@@ -782,9 +788,10 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
     }
 
     /**
-     * Segment Replication method -
+     * Segment Replication method
      * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
      * part of the latest on-disk commit point.
+     *
      * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
      * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
      * @param reason         the reason for this cleanup operation logged for each deleted file
@@ -792,24 +799,59 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
      * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
      */
     public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
+        this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true);
+    }
+
+    /**
+     * Segment Replication method
+     *
+     * Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup
+     *
+     * This method deletes every file in this store. Except
+     *  1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary
+     *  2. Files part of the passed in segment infos, typically the last committed segment info
+     *  3. Files incremented by active reader for pit/scroll queries
+     *  4. Temporary replication file if passed in deleteTempFiles is true.
+     *
+     * @param reason         the reason for this cleanup operation logged for each deleted file
+     * @param infos          {@link SegmentInfos} Files from this infos will be preserved on disk if present.
+     * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos
+     * @param deleteTempFiles Does this clean up delete temporary replication files
+     *
+     * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
+     */
+    public void cleanupAndPreserveLatestCommitPoint(
+        String reason,
+        SegmentInfos infos,
+        SegmentInfos lastCommittedSegmentInfos,
+        boolean deleteTempFiles
+    ) throws IOException {
         assert indexSettings.isSegRepEnabled();
         // fetch a snapshot from the latest on disk Segments_N file. This can be behind
         // the passed in local in memory snapshot, so we want to ensure files it references are not removed.
         metadataLock.writeLock().lock();
         try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
-            cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
+            cleanupFiles(reason, getMetadata(lastCommittedSegmentInfos), infos.files(true), deleteTempFiles);
         } finally {
             metadataLock.writeLock().unlock();
         }
     }
 
-    private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
-        throws IOException {
+    private void cleanupFiles(
+        String reason,
+        MetadataSnapshot localSnapshot,
+        @Nullable Collection<String> additionalFiles,
+        boolean deleteTempFiles
+    ) throws IOException {
         assert metadataLock.isWriteLockedByCurrentThread();
         for (String existingFile : directory.listAll()) {
             if (Store.isAutogenerated(existingFile)
                 || localSnapshot.contains(existingFile)
-                || (additionalFiles != null && additionalFiles.contains(existingFile))) {
+                || (additionalFiles != null && additionalFiles.contains(existingFile))
+                // also ensure we are not deleting a file referenced by an active reader.
+                || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false
+                // prevent temporary file deletion during reader cleanup
+                || deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) {
                 // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
                 // checksum)
                 continue;
@@ -1909,4 +1951,16 @@ private static IndexWriterConfig newIndexWriterConfig() {
             // we also don't specify a codec here and merges should use the engines for this index
             .setMergePolicy(NoMergePolicy.INSTANCE);
     }
+
+    public void incRefFileDeleter(Collection<String> files) {
+        if (this.indexSettings.isSegRepEnabled()) {
+            this.replicaFileTracker.incRef(files);
+        }
+    }
+
+    public void decRefFileDeleter(Collection<String> files) {
+        if (this.indexSettings.isSegRepEnabled()) {
+            this.replicaFileTracker.decRef(files);
+        }
+    }
 }
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
index 03d67f4aa2313..995ec58d8768f 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
@@ -51,6 +51,8 @@ public class SegmentReplicationTarget extends ReplicationTarget {
     private final SegmentReplicationState state;
     protected final MultiFileWriter multiFileWriter;
 
+    public final static String REPLICATION_PREFIX = "replication.";
+
     public ReplicationCheckpoint getCheckpoint() {
         return this.checkpoint;
     }
@@ -85,7 +87,7 @@ protected void closeInternal() {
 
     @Override
     protected String getPrefix() {
-        return "replication." + UUIDs.randomBase64UUID() + ".";
+        return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + ".";
     }
 
     @Override