Skip to content

Commit

Permalink
Incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jan 5, 2023
1 parent 3a56b49 commit 7914084
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
(idxSettings, shardRouting) -> new InternalTranslogFactory(),
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.Version;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.TriFunction;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -493,7 +495,7 @@ public IndexService newIndexService(
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -549,7 +551,7 @@ public IndexService newIndexService(
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory,
repositoriesServiceSupplier
translogFactorySupplier
);
success = true;
return indexService;
Expand Down
20 changes: 3 additions & 17 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand All @@ -99,7 +97,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -177,7 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -210,7 +207,7 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -282,7 +279,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.translogFactorySupplier = translogFactorySupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -525,17 +522,6 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier = (idxSettings, shardRouting) -> {
if (idxSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
idxSettings.getRemoteStoreTranslogRepository()
);
}
return new InternalTranslogFactory();
};

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
Expand Down
15 changes: 8 additions & 7 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
Expand Down Expand Up @@ -177,10 +178,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -270,7 +271,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

@Override
protected void doStart() {
Expand Down Expand Up @@ -300,7 +301,7 @@ public IndicesService(
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -389,7 +390,7 @@ protected void closeInternal() {
this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.translogFactorySupplier = translogFactorySupplier;
}

public IndicesService(
Expand All @@ -415,7 +416,7 @@ public IndicesService(
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -504,7 +505,7 @@ protected void closeInternal() {
this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.translogFactorySupplier = translogFactorySupplier;
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand Down Expand Up @@ -868,7 +869,7 @@ private synchronized IndexService createIndexService(
this::isIdFieldDataEnabled,
valuesSourceRegistry,
remoteDirectoryFactory,
repositoriesServiceSupplier
translogFactorySupplier
);
}

Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.SetOnce;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
Expand Down Expand Up @@ -228,6 +232,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -662,6 +667,17 @@ protected Node(
repositoriesServiceReference::get
);

final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier = (indexSettings, shardRouting) -> {
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceReference::get,
threadPool,
indexSettings.getRemoteStoreTranslogRepository()
);
}
return new InternalTranslogFactory();
};

final IndicesService indicesService;
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
indicesService = new IndicesService(
Expand All @@ -687,7 +703,7 @@ protected Node(
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
remoteDirectoryFactory,
repositoriesServiceReference::get
translogFactorySupplier
);
} else {
indicesService = new IndicesService(
Expand All @@ -712,7 +728,7 @@ protected Node(
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
remoteDirectoryFactory,
repositoriesServiceReference::get
translogFactorySupplier
);
}

Expand Down

0 comments on commit 7914084

Please sign in to comment.