From 56ef936674972baab284302bc2c25dd73added70 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 7 Mar 2022 09:44:36 -0800 Subject: [PATCH] docrep working Signed-off-by: Poojita Raj --- .../index/engine/InternalEngine.java | 10 +-- .../opensearch/index/shard/IndexShard.java | 70 ++++++++++--------- .../indices/recovery/MultiFileWriter.java | 5 +- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 48746e34f5f63..6de79a8e91057 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -283,11 +283,11 @@ public InternalEngine(EngineConfig engineConfig) { historyUUID = null; forceMergeUUID = null; } else { - writer = createWriter(); - bootstrapAppendOnlyInfoFromWriter(writer); - final Map commitData = commitDataAsMap(writer); - historyUUID = loadHistoryUUID(commitData); - forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); + writer = createWriter(); + bootstrapAppendOnlyInfoFromWriter(writer); + final Map commitData = commitDataAsMap(writer); + historyUUID = loadHistoryUUID(commitData); + forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); } indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 886f31457bbf4..5d6e98b90fa2d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1277,7 +1277,7 @@ public void refresh(String source) { */ public long getWritingBytes() { // TODO: Segrep: hack - if not the primary our IW is null and this blows up. - if (shardRouting.primary() == false) { + if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) { return 0L; } Engine engine = getEngineOrNull(); @@ -2029,7 +2029,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException { public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; // TODO: Segrep - fix initial recovery stages from ReplicationTarget. - // recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + if (indexSettings.isSegrepEnabled() == false) { + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + } loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); @@ -2066,7 +2068,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); // TODO: Segrep - Fix - // assert assertSequenceNumbersInCommit(); + if (indexSettings.isSegrepEnabled() == false) { + assert assertSequenceNumbersInCommit(); + } recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } @@ -2238,7 +2242,7 @@ protected final void verifyActive() throws IllegalIndexShardStateException { * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { - if (shardRouting.primary() == false) { + if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) { return 0; } Engine engine = getEngineOrNull(); @@ -2714,7 +2718,9 @@ public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); // TODO: Segrep - Fix retention leases - // replicationTracker.renewPeerRecoveryRetentionLeases(); + if (indexSettings.isSegrepEnabled() == false) { + replicationTracker.renewPeerRecoveryRetentionLeases(); + } final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); @@ -3303,31 +3309,31 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } else { internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); } - return this.engineConfigFactory.newEngineConfig( - shardId, - threadPool, - indexSettings, - warmer, - store, - indexSettings.getMergePolicy(), - mapperService != null ? mapperService.indexAnalyzer() : null, - similarityService.similarity(mapperService), - codecService, - shardEventListener, - indexCache != null ? indexCache.query() : null, - cachingPolicy, - translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Arrays.asList(refreshListeners, refreshPendingLocationListener), - internalRefreshListener, - indexSort, - circuitBreakerService, - globalCheckpointSupplier, - replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), - tombstoneDocSupplier(), - shardRouting.primary() - ); + return this.engineConfigFactory.newEngineConfig( + shardId, + threadPool, + indexSettings, + warmer, + store, + indexSettings.getMergePolicy(), + mapperService != null ? mapperService.indexAnalyzer() : null, + similarityService.similarity(mapperService), + codecService, + shardEventListener, + indexCache != null ? indexCache.query() : null, + cachingPolicy, + translogConfig, + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), + Arrays.asList(refreshListeners, refreshPendingLocationListener), + internalRefreshListener, + indexSort, + circuitBreakerService, + globalCheckpointSupplier, + replicationTracker::getRetentionLeases, + () -> getOperationPrimaryTerm(), + tombstoneDocSupplier(), + shardRouting.primary() + ); } /** @@ -3918,8 +3924,8 @@ ReplicationTracker getReplicationTracker() { public boolean scheduledRefresh() { // skip if not primary shard. // TODO: Segrep - should split into primary/replica classes. - if ((indexSettings.isSegrepEnabled()) &&(shardRouting.primary() == false) ) { - return false; + if ((indexSettings.isSegrepEnabled()) && (shardRouting.primary() == false)) { + return false; } verifyNotClosed(); boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index fe35711547fce..e88d123f50679 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Map; @@ -151,7 +152,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, + Arrays.toString(store.directory().listAll()); // TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync // when a new checkpoint is received. - // store.directory().sync(Collections.singleton(temporaryFileName)); + if (store.indexSettings().isSegrepEnabled() == false) { + store.directory().sync(Collections.singleton(temporaryFileName)); + } IndexOutput remove = removeOpenIndexOutputs(name); assert remove == null || remove == indexOutput; // remove maybe null if we got finished }