Skip to content

Commit

Permalink
spotless check
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed May 31, 2023
1 parent 137177c commit 21f7a75
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 86 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))
- [Segment Replication] Rolling upgrade support for default codecs ([#7698](https://github.com/opensearch-project/OpenSearch/pull/7698))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down Expand Up @@ -155,4 +156,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,9 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {

public void loadMap() {
opensearchVersionToLuceneCodec.put(Version.fromString("3.0.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.8.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.7.1"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.7.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.6.0"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.2"), "Lucene95");
opensearchVersionToLuceneCodec.put(Version.fromString("2.5.1"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.5.0"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.1"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.4.0"), "Lucene94");
opensearchVersionToLuceneCodec.put(Version.fromString("2.3.0"), "Lucene93");
opensearchVersionToLuceneCodec.put(Version.fromString("2.2.1"), "Lucene93");
opensearchVersionToLuceneCodec.put(Version.fromString("2.2.0"), "Lucene93");
opensearchVersionToLuceneCodec.put(Version.fromString("2.1.0"), "Lucene92");
opensearchVersionToLuceneCodec.put(Version.fromString("2.0.1"), "Lucene91");
opensearchVersionToLuceneCodec.put(Version.fromString("2.0.0"), "Lucene91");
}

public Codec codec(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -90,7 +89,6 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -2319,9 +2317,7 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
Codec bwcCodec = engineConfig.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(engineConfig.getClusterMinVersion()));
iwc.setCodec(bwcCodec);
engineConfig.setCodecName(bwcCodec.getName());
iwc.setCodec(engineConfig.getCodec());
iwc.setUseCompoundFile(true); // always use compound on flush - reduces # of file-handles on refresh
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

package org.opensearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.codec.CodecService;

/**
* Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
Expand All @@ -27,10 +26,6 @@ public NRTReplicationEngineFactory(ClusterService clusterService) {
this.clusterService = clusterService;
}

public NRTReplicationEngineFactory() {
this.clusterService = null;
}

@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
Expand All @@ -39,6 +34,7 @@ public Engine newReadWriteEngine(EngineConfig config) {
if (clusterService != null) {
DiscoveryNodes nodes = this.clusterService.state().nodes();
config.setClusterMinVersion(nodes.getMinNodeVersion());
config.setCodecName(config.getBWCCodec(CodecService.opensearchVersionToLuceneCodec.get(nodes.getMinNodeVersion())).getName());
}
return new InternalEngine(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -1789,7 +1788,7 @@ static Engine.Searcher wrapSearcher(
}

/**
* Used with segment replication during relocation handoff, this method updates current read only engine to global
* Used with segment replication during relocation handoff and rolling upgrades, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException if communication failed
Expand All @@ -1798,7 +1797,7 @@ static Engine.Searcher wrapSearcher(
*
* @opensearch.internal
*/
public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException {
public void resetEngine() throws IOException, InterruptedException, TimeoutException {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

Expand Down Expand Up @@ -4346,7 +4345,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
/**
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
public void resetEngineToGlobalCheckpoint() throws IOException {
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are ["
+ getActiveOperations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final ScriptService scriptService;
private final ClusterService clusterService;
private final Client client;
public volatile Map<String, IndexService> indices = emptyMap();
private volatile Map<String, IndexService> indices = emptyMap();
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final AtomicInteger numUncompletedDeletes = new AtomicInteger();
private final OldShardsStats oldShardsStats = new OldShardsStats();
Expand Down Expand Up @@ -474,10 +474,6 @@ public ClusterService clusterService() {
return clusterService;
}

public Map<String, IndexService> indices() {
return indices;
}

@Override
protected void doStop() {
ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public void onReplicationDone(SegmentReplicationState state) {
try {
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
indexShard.resetEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import java.util.ArrayList;
import java.util.List;

/**
* SegmentReplicationUpgradeListener is used to upgrade the opensearch version used by all primaries of a cluster when
* segment replication is enabled and a rolling upgrade is completed (while in mixed cluster state, the primaries use lower codec
* version on their primaries and this needs to be reset once upgrade is complete).
*/
public class SegmentReplicationUpgradeListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationUpgradeListener.class);
Expand All @@ -33,29 +38,34 @@ public SegmentReplicationUpgradeListener(IndicesService indicesService) {

@Override
public void clusterChanged(ClusterChangedEvent event) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService.indices().values()) {
for (IndexShard indexShard : indexService) {
try {
if ((indexShard.getEngine().config().isReadOnlyReplica() == false) && (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
if (event.nodesChanged()) {
List<IndexShard> indexShardList = new ArrayList<>();
DiscoveryNodes nodes = event.state().nodes();
if (nodes.getMinNodeVersion().equals(nodes.getMaxNodeVersion())) {
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
try {
if (indexShard.indexSettings().isSegRepEnabled()
&& indexShard.indexSettings().getNumberOfReplicas() > 0
&& indexShard.routingEntry().primary()
&& (indexShard.getEngine().config().getClusterMinVersion() != nodes.getMaxNodeVersion())) {
indexShardList.add(indexShard);
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
} catch (AlreadyClosedException e) {
logger.warn("Index shard [{}] engine is already closed.", indexShard.shardId());
}
}
}
}
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard is : indexShardList) {
is.resetEngineToGlobalCheckpoint();
try {
if (indexShardList.isEmpty() == false) {
for (IndexShard indexShard : indexShardList) {
indexShard.resetEngine();
}
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}
} catch (Exception e) {
logger.error("Received unexpected exception: [{}]", e.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.indices.IndicesService;

/**
* SegmentReplicationUpgradeService is used to manage SegmentReplicationUpgradeListener's lifecycle (creation/deletion).
*/
public class SegmentReplicationUpgradeService implements Releasable {

private final ClusterApplierService clusterApplierService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevel
.build();

public void testIsSegrepLimitBreached() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand Down Expand Up @@ -104,7 +104,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand All @@ -131,7 +131,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception {
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
final SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand All @@ -154,7 +154,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception {
}

public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception {
try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
indexInBatches(5, shards, primaryShard);
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testFailStaleReplicaTask() throws Exception {
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10))
.build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory(null))) {
shards.startAll();
final IndexShard primaryShard = shards.getPrimary();
SegmentReplicationPressureService service = buildPressureService(settings, primaryShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -1807,23 +1808,26 @@ public void testSegmentReplicationCheckpointTracking() {
1,
1,
1L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Version.CURRENT
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Version.CURRENT
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Version.CURRENT
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
Expand Down
Loading

0 comments on commit 21f7a75

Please sign in to comment.