Skip to content

Commit

Permalink
toggle
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>

index writer null exception

Signed-off-by: Poojita Raj <[email protected]>

included conditional logic

Signed-off-by: Poojita Raj <[email protected]>

internal listener fix

Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Mar 7, 2022
1 parent 4d25000 commit cd03754
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 74 deletions.
4 changes: 1 addition & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,7 @@ private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
shard.scheduledRefresh();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false);
<<<<<<< HEAD
=======

>>>>>>> 5950974c5a2 (toggle)
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -883,7 +887,7 @@ public int getNumberOfReplicas() {
}

public boolean isSegrepEnabled() {
return isSegrepEnabled;
return Boolean.TRUE.equals(isSegrepEnabled);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,17 @@ public InternalEngine(EngineConfig engineConfig) {
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
// TODO: Segrep - should have a separate read only engine rather than all this conditional logic.
if (engineConfig.isPrimary()) {
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
} else {
if ((engineConfig.getIndexSettings().isSegrepEnabled()) && engineConfig.isPrimary() == false) {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
forceMergeUUID = null;
} else {
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
}
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
Expand Down Expand Up @@ -582,7 +582,8 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
translog.currentFileGeneration()
)
);
if (engineConfig.isPrimary()) {
boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled();
if ((segrep == false) || (segrep && engineConfig.isPrimary())) {
flush(false, true);
}
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -652,7 +653,8 @@ public Translog.Location getTranslogLastWriteLocation() {

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
if (engineConfig.isPrimary()) {
boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled();
if ((segrep == false) || (segrep && engineConfig.isPrimary())) {
indexWriter.deleteUnusedFiles();
}
}
Expand Down Expand Up @@ -715,8 +717,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}

private DirectoryReader getDirectoryReader() throws IOException {
// replicas should create the reader from store, we don't want an open IW on replicas.
if (engineConfig.isPrimary() == false) {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
if (engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isPrimary() == false) {
return DirectoryReader.open(store.directory());
}
return DirectoryReader.open(indexWriter);
Expand Down Expand Up @@ -1980,7 +1982,7 @@ public boolean shouldPeriodicallyFlush() {

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (engineConfig.isPrimary() == false) {
if ((engineConfig.getIndexSettings().isSegrepEnabled()) && (engineConfig.isPrimary() == false)) {
return;
}
ensureOpen();
Expand Down Expand Up @@ -2440,7 +2442,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
if (engineConfig.isPrimary()) {
boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled();
if (segrep == false || (segrep && engineConfig.isPrimary())) {
indexWriter.rollback();
}
} catch (AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

package org.opensearch.index.engine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -44,11 +40,14 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.ReferenceManager;

import org.apache.lucene.search.SearcherManager;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Utility class to safely share {@link OpenSearchDirectoryReader} instances across
* multiple threads, while periodically reopening. This class ensures each
Expand Down
112 changes: 60 additions & 52 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,7 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
Boolean isSegRepEnabled = indexSettings.isSegrepEnabled();
if (isSegRepEnabled != null && isSegRepEnabled) {
if (indexSettings.isSegrepEnabled()) {
Engine.Index index;
try {
index = parseSourceAndPrepareIndex(
Expand Down Expand Up @@ -3073,32 +3072,35 @@ public void startRecovery(
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}
if (indexSettings.isSegrepEnabled()) {
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
}
}
}
);
);
} else {
peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
}
} catch (Exception e) {
logger.error("Error preparing the shard for Segment replication", e);
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
Expand Down Expand Up @@ -3295,31 +3297,37 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
similarityService.similarity(mapperService),
codecService,
shardEventListener,
indexCache != null ? indexCache.query() : null,
cachingPolicy,
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener),
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
shardRouting.primary()
);
final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (indexSettings.isSegrepEnabled()) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
similarityService.similarity(mapperService),
codecService,
shardEventListener,
indexCache != null ? indexCache.query() : null,
cachingPolicy,
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
shardRouting.primary()
);
}

/**
Expand Down Expand Up @@ -3910,8 +3918,8 @@ ReplicationTracker getReplicationTracker() {
public boolean scheduledRefresh() {
// skip if not primary shard.
// TODO: Segrep - should split into primary/replica classes.
if (shardRouting.primary() == false) {
return false;
if ((indexSettings.isSegrepEnabled()) &&(shardRouting.primary() == false) ) {
return false;
}
verifyNotClosed();
boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
Expand Down

0 comments on commit cd03754

Please sign in to comment.