Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 24, 2023
1 parent e74aa44 commit fac5d45
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 76 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550))
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))
- Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870))
- [Segment Replication] Rolling upgrade support for default codecs ([#7698](https://github.com/opensearch-project/OpenSearch/pull/7698))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down Expand Up @@ -140,4 +141,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -90,7 +89,6 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -2319,9 +2317,7 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
Codec bwcCodec = engineConfig.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(engineConfig.getClusterMinVersion()));
iwc.setCodec(bwcCodec);
engineConfig.setCodecName(bwcCodec.getName());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.codec.CodecService;

/**
* Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
Expand All @@ -25,10 +26,6 @@ public NRTReplicationEngineFactory(ClusterService clusterService) {
this.clusterService = clusterService;
}

public NRTReplicationEngineFactory() {
this.clusterService = null;
}

@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
Expand All @@ -37,6 +34,7 @@ public Engine newReadWriteEngine(EngineConfig config) {
if (clusterService != null) {
DiscoveryNodes nodes = this.clusterService.state().nodes();
config.setClusterMinVersion(nodes.getMinNodeVersion());
config.setCodecName(config.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(nodes.getMinNodeVersion())).getName());
}
return new InternalEngine(config);
}
Expand Down
11 changes: 2 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1632,13 +1632,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
// if replica's OS version is on or after primary version, then can process checkpoint
if (localNodeVersion.onOrAfter(requestCheckpoint.getMinVersion()) == false) {
logger.trace(
() -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec())
);
return false;
}
return true;
}

Expand Down Expand Up @@ -1797,7 +1790,7 @@ static Engine.Searcher wrapSearcher(
}

/**
* Used with segment replication during relocation handoff, this method updates current read only engine to global
* Used with segment replication during relocation handoff and rolling upgrades, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException if communication failed
Expand All @@ -1806,7 +1799,7 @@ static Engine.Searcher wrapSearcher(
*
* @opensearch.internal
*/
public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException {
public void resetEngine() throws IOException, InterruptedException, TimeoutException {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final ScriptService scriptService;
private final ClusterService clusterService;
private final Client client;
public volatile Map<String, IndexService> indices = emptyMap();
private volatile Map<String, IndexService> indices = emptyMap();
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final AtomicInteger numUncompletedDeletes = new AtomicInteger();
private final OldShardsStats oldShardsStats = new OldShardsStats();
Expand Down Expand Up @@ -478,10 +478,6 @@ public ClusterService clusterService() {
return clusterService;
}

public Map<String, IndexService> indices() {
return indices;
}

@Override
protected void doStop() {
ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
}
final Thread thread = Thread.currentThread();
Version localNodeVersion = indicesService.clusterService().state().nodes().getLocalNode().getVersion();
// if replica's OS version is not on or after primary version, then can ignore checkpoint
if (localNodeVersion.onOrAfter(receivedCheckpoint.getMinVersion()) == false) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring checkpoint, shard not started {} {}\n Shard does not support the received lucene codec version {}",
receivedCheckpoint,
replicaShard.state(),
receivedCheckpoint.getCodec()
)
);
return;
}
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint, localNodeVersion)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
@Override
Expand Down Expand Up @@ -449,7 +461,7 @@ public void onReplicationDone(SegmentReplicationState state) {
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
indexShard.resetEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import java.util.ArrayList;
import java.util.List;

/**
* SegmentReplicationUpgradeListener is used to upgrade the opensearch version used by all primaries of a cluster when
* segment replication is enabled and a rolling upgrade is completed (while in mixed cluster state, the primaries use lower codec
* version on their primaries and this needs to be reset once upgrade is complete).
*/
public class SegmentReplicationUpgradeListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class);
Expand All @@ -33,30 +38,32 @@ public SegmentReplicationUpgradeListener(IndicesService indicesService) {

@Override
public void clusterChanged(ClusterChangedEvent event) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService.indices().values()) {
for (IndexShard indexShard : indexService) {
try {
if ((indexShard.getEngine().config().isReadOnlyReplica() == false)
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
if (event.nodesChanged()) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.indexSettings().isSegRepEnabled() && indexShard.indexSettings().getNumberOfReplicas() > 0 && indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
}
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard is : indexShardList) {
is.resetEngineToGlobalCheckpoint();
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard indexShard : indexShardList) {
indexShard.resetEngine();
}
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.indices.IndicesService;

/**
* SegmentReplicationUpgradeService is used to manage SegmentReplicationUpgradeListener's lifecycle (creation/deletion).
*/
public class SegmentReplicationUpgradeService implements Releasable {

private final ClusterApplierService clusterApplierService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevel
.build();

public void testIsSegrepLimitBreached() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand Down Expand Up @@ -104,7 +104,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand All @@ -131,7 +131,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception {
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
final SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand All @@ -154,7 +154,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception {
}

public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception {
try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
indexInBatches(5, shards, primaryShard);
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testFailStaleReplicaTask() throws Exception {
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10))
.build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -1807,23 +1808,26 @@ public void testSegmentReplicationCheckpointTracking() {
1,
1,
1L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Version.CURRENT
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Version.CURRENT
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Version.CURRENT
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
Expand Down
Loading

0 comments on commit fac5d45

Please sign in to comment.