Skip to content

Commit

Permalink
docrep working
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Mar 7, 2022
1 parent cd03754 commit 56ef936
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ public InternalEngine(EngineConfig engineConfig) {
historyUUID = null;
forceMergeUUID = null;
} else {
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
}
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
Expand Down
70 changes: 38 additions & 32 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ public void refresh(String source) {
*/
public long getWritingBytes() {
// TODO: Segrep: hack - if not the primary our IW is null and this blows up.
if (shardRouting.primary() == false) {
if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) {
return 0L;
}
Engine engine = getEngineOrNull();
Expand Down Expand Up @@ -2029,7 +2029,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
// TODO: Segrep - fix initial recovery stages from ReplicationTarget.
// recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
if (indexSettings.isSegrepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
Expand Down Expand Up @@ -2066,7 +2068,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
// TODO: Segrep - Fix
// assert assertSequenceNumbersInCommit();
if (indexSettings.isSegrepEnabled() == false) {
assert assertSequenceNumbersInCommit();
}
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

Expand Down Expand Up @@ -2238,7 +2242,7 @@ protected final void verifyActive() throws IllegalIndexShardStateException {
* Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
*/
public long getIndexBufferRAMBytesUsed() {
if (shardRouting.primary() == false) {
if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) {
return 0;
}
Engine engine = getEngineOrNull();
Expand Down Expand Up @@ -2714,7 +2718,9 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
// TODO: Segrep - Fix retention leases
// replicationTracker.renewPeerRecoveryRetentionLeases();
if (indexSettings.isSegrepEnabled() == false) {
replicationTracker.renewPeerRecoveryRetentionLeases();
}
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down Expand Up @@ -3303,31 +3309,31 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
similarityService.similarity(mapperService),
codecService,
shardEventListener,
indexCache != null ? indexCache.query() : null,
cachingPolicy,
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
shardRouting.primary()
);
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
similarityService.similarity(mapperService),
codecService,
shardEventListener,
indexCache != null ? indexCache.query() : null,
cachingPolicy,
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
shardRouting.primary()
);
}

/**
Expand Down Expand Up @@ -3918,8 +3924,8 @@ ReplicationTracker getReplicationTracker() {
public boolean scheduledRefresh() {
// skip if not primary shard.
// TODO: Segrep - should split into primary/replica classes.
if ((indexSettings.isSegrepEnabled()) &&(shardRouting.primary() == false) ) {
return false;
if ((indexSettings.isSegrepEnabled()) && (shardRouting.primary() == false)) {
return false;
}
verifyNotClosed();
boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -151,7 +152,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ Arrays.toString(store.directory().listAll());
// TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync
// when a new checkpoint is received.
// store.directory().sync(Collections.singleton(temporaryFileName));
if (store.indexSettings().isSegrepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down

0 comments on commit 56ef936

Please sign in to comment.