From 03bb12dac873673d0454b83139fd4c7394fbecd1 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Thu, 24 Mar 2022 14:30:33 -0700 Subject: [PATCH] Refactoring checkpoint publishing to no longer be a Broadcast Action This change adopts the same methodology as RetentionLeaseSyncer. A Transport client action is not required since we can skip the step of routing to the primary shard (the checkpoitn publisher is guaranteed to fire only from the primary). Now, checkpoint publishing directly invokes a TransportReplicationAction to perform the operation on the primary and replicas. PublishCheckpointAction has been reworked to be the TransportReplicationAction implementation rather than an ActionType. We leverage dependency injection to create the checkpoint publisher (and its internal action) at IndicesClusterStateService. This is plumed through to IndexShard which creates the refresh listener instance. All other transport layer classes tied to the original broadcast action are no longer required. Unrelated integration tests use a no-op/empty checkpoint publisher to satisfy their constructor/method argument. Signed-off-by: Kartik Ganesh --- .../opensearch/index/shard/IndexShardIT.java | 4 +- .../org/opensearch/action/ActionModule.java | 3 - .../opensearch/client/IndicesAdminClient.java | 8 - .../client/support/AbstractClient.java | 7 - .../org/opensearch/index/IndexModule.java | 7 +- .../org/opensearch/index/IndexService.java | 10 +- .../shard/CheckpointRefreshListener.java | 10 +- .../opensearch/index/shard/IndexShard.java | 4 +- .../org/opensearch/indices/IndicesModule.java | 4 +- .../opensearch/indices/IndicesService.java | 9 +- .../cluster/IndicesClusterStateService.java | 10 +- .../checkpoint/PublishCheckpointAction.java | 166 +++++++++++++++++- .../checkpoint/PublishCheckpointRequest.java | 10 +- ...SegmentReplicationCheckpointPublisher.java | 38 ++++ .../ShardPublishCheckpointRequest.java | 46 ----- .../TransportCheckpointPublisher.java | 42 ----- .../TransportPublishCheckpointAction.java | 68 ------- ...TransportPublishShardCheckpointAction.java | 103 ----------- .../opensearch/index/IndexModuleTests.java | 5 +- ...dicesLifecycleListenerSingleNodeTests.java | 8 +- ...actIndicesClusterStateServiceTestCase.java | 2 + ...ClusterStateServiceRandomUpdatesTests.java | 2 + .../snapshots/SnapshotResiliencyTests.java | 4 +- .../index/shard/IndexShardTestCase.java | 5 +- 24 files changed, 250 insertions(+), 325 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java delete mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java delete mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java delete mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java delete mode 100644 server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index e50e04cf94a6b..7b3baf996f47a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -84,7 +84,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.DummyShardLock; @@ -675,7 +675,7 @@ public final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - new TransportCheckpointPublisher(client()) + SegmentReplicationCheckpointPublisher.EMPTY ); } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 752f02d03cb16..8e31aa23d88cf 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -268,8 +268,6 @@ import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; -import org.opensearch.indices.replication.checkpoint.TransportPublishCheckpointAction; import org.opensearch.persistent.CompletionPersistentTaskAction; import org.opensearch.persistent.RemovePersistentTaskAction; import org.opensearch.persistent.StartPersistentTaskAction; @@ -588,7 +586,6 @@ public void reg actions.register(SimulateTemplateAction.INSTANCE, TransportSimulateTemplateAction.class); actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class); actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class); - actions.register(PublishCheckpointAction.INSTANCE, TransportPublishCheckpointAction.class); actions.register(FlushAction.INSTANCE, TransportFlushAction.class); actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class); actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class); diff --git a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java index 2710035dce906..7f51b8af19e4b 100644 --- a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java @@ -132,7 +132,6 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.metadata.IndexMetadata.APIBlock; import org.opensearch.common.Nullable; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; /** * Administrative actions/operations against indices. @@ -403,13 +402,6 @@ public interface IndicesAdminClient extends OpenSearchClient { */ void refresh(RefreshRequest request, ActionListener listener); - /** - * Publish the latest primary checkpoint to replica shards. - * @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest - * @param listener A listener to be notified with a result - */ - void publishCheckpoint(PublishCheckpointRequest request, ActionListener listener); - /** * Explicitly refresh one or more indices (making the content indexed since the last refresh searchable). */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 79ad5be58cb78..a37d293ee5dd2 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -368,8 +368,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; @@ -1610,11 +1608,6 @@ public void refresh(final RefreshRequest request, final ActionListener listener) { - execute(PublishCheckpointAction.INSTANCE, request, listener); - } - @Override public RefreshRequestBuilder prepareRefresh(String... indices) { return new RefreshRequestBuilder(this, RefreshAction.INSTANCE).setIndices(indices); diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index d2f53fe28e811..61f6c9689308f 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -75,7 +75,6 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -467,8 +466,7 @@ public IndexService newIndexService( IndicesFieldDataCache indicesFieldDataCache, NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, - ValuesSourceRegistry valuesSourceRegistry, - TransportCheckpointPublisher checkpointPublisher + ValuesSourceRegistry valuesSourceRegistry ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -522,8 +520,7 @@ public IndexService newIndexService( allowExpensiveQueries, expressionResolver, valuesSourceRegistry, - recoveryStateFactory, - checkpointPublisher + recoveryStateFactory ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 9bf0c8da9ccc8..fdbd0a4cf8471 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -94,7 +94,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -166,7 +166,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexNameExpressionResolver expressionResolver; private final Supplier indexSortSupplier; private final ValuesSourceRegistry valuesSourceRegistry; - private final TransportCheckpointPublisher checkpointPublisher; public IndexService( IndexSettings indexSettings, @@ -197,8 +196,7 @@ public IndexService( BooleanSupplier allowExpensiveQueries, IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, - TransportCheckpointPublisher checkpointPublisher + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -209,7 +207,6 @@ public IndexService( this.circuitBreakerService = circuitBreakerService; this.expressionResolver = expressionResolver; this.valuesSourceRegistry = valuesSourceRegistry; - this.checkpointPublisher = checkpointPublisher; if (needsMapperService(indexSettings, indexCreationContext)) { assert indexAnalyzers != null; this.mapperService = new MapperService( @@ -422,7 +419,8 @@ private long getAvgShardSizeInBytes() throws IOException { public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 1c92396c167ad..ecfc79a31a15f 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -11,7 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.ReferenceManager; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import java.io.IOException; @@ -24,9 +24,9 @@ public class CheckpointRefreshListener implements ReferenceManager.RefreshListen protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); private final IndexShard shard; - private final TransportCheckpointPublisher publisher; + private final SegmentReplicationCheckpointPublisher publisher; - public CheckpointRefreshListener(IndexShard shard, TransportCheckpointPublisher publisher) { + public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) { this.shard = shard; this.publisher = publisher; } @@ -38,8 +38,8 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (shard.routingEntry().primary()) { - publisher.publish(shard.getLatestReplicationCheckpoint()); + if (didRefresh) { + publisher.publish(shard); } } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a792a8914c817..6ae827b553519 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -161,7 +161,7 @@ import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationFailedException; @@ -327,7 +327,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final TransportCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 50ead5519d574..5efab2d3633ab 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -73,7 +73,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; -import org.opensearch.indices.replication.checkpoint.TransportPublishShardCheckpointAction; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.store.IndicesStore; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.plugins.MapperPlugin; @@ -276,8 +276,8 @@ protected void configure() { bind(PrimaryReplicaSyncer.class).asEagerSingleton(); bind(RetentionLeaseSyncAction.class).asEagerSingleton(); bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); - bind(TransportPublishShardCheckpointAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); + bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b85a65c7e67be..4d5b22865a2a0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -138,7 +138,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; @@ -740,8 +740,7 @@ private synchronized IndexService createIndexService( indicesFieldDataCache, namedWriteableRegistry, this::isIdFieldDataEnabled, - valuesSourceRegistry, - new TransportCheckpointPublisher(client) + valuesSourceRegistry ); } @@ -840,6 +839,7 @@ public IndexShard createShard( final SegmentReplicationReplicaService segmentReplicationReplicaService, final SegmentReplicationReplicaService.SegmentReplicationListener segRepListener, final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, @@ -850,11 +850,12 @@ public IndexShard createShard( final DiscoveryNode sourceNode ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); + Objects.requireNonNull(checkpointPublisher); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery( recoveryState, diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e09add708b14c..019329ecb2801 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -81,6 +81,7 @@ import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.SegmentReplicationState; @@ -138,6 +139,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; + private final SegmentReplicationCheckpointPublisher checkpointPublisher; @Inject public IndicesClusterStateService( @@ -156,7 +158,8 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationReplicaService replicationReplicaService, - final PrimaryShardReplicationSource replicationSource + final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) { this( settings, @@ -165,6 +168,7 @@ public IndicesClusterStateService( threadPool, replicationReplicaService, replicationSource, + checkpointPublisher, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, @@ -186,6 +190,7 @@ public IndicesClusterStateService( final ThreadPool threadPool, final SegmentReplicationReplicaService replicationReplicaService, final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, @@ -200,6 +205,7 @@ public IndicesClusterStateService( this.settings = settings; this.segmentReplicationReplicaService = replicationReplicaService; this.replicationSource = replicationSource; + this.checkpointPublisher = checkpointPublisher; this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; @@ -632,6 +638,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR segmentReplicationReplicaService, new ShardRoutingReplicationListener(shardRouting, primaryTerm), replicationSource, + checkpointPublisher, recoveryTargetService, new ShardRoutingRecoveryListener(shardRouting, primaryTerm), repositoriesService, @@ -1030,6 +1037,7 @@ T createShard( SegmentReplicationReplicaService replicaService, SegmentReplicationReplicaService.SegmentReplicationListener segRepListener, PrimaryShardReplicationSource replicationSource, + SegmentReplicationCheckpointPublisher checkpointPublisher, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index ece59432f4a37..b655edd8c6d79 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -8,14 +8,166 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.action.ActionType; -import org.opensearch.action.admin.indices.refresh.RefreshResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.action.support.replication.ReplicationTask; +import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; +import org.opensearch.node.NodeClosedException; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; -public class PublishCheckpointAction extends ActionType { - public static final PublishCheckpointAction INSTANCE = new PublishCheckpointAction(); - public static final String NAME = "indices:admin/publishCheckpoint"; +import java.io.IOException; +import java.util.Objects; - private PublishCheckpointAction() { - super(NAME, RefreshResponse::new); +public class PublishCheckpointAction extends TransportReplicationAction< + PublishCheckpointRequest, + PublishCheckpointRequest, + ReplicationResponse> { + + public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; + protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + + private final SegmentReplicationReplicaService replicationService; + private final PrimaryShardReplicationSource source; + + @Inject + public PublishCheckpointAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + SegmentReplicationReplicaService segmentCopyService, + PrimaryShardReplicationSource source + ) { + super( + settings, + ACTION_NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + PublishCheckpointRequest::new, + PublishCheckpointRequest::new, + ThreadPool.Names.REFRESH + ); + this.replicationService = segmentCopyService; + this.source = source; + } + + @Override + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); + } + + @Override + protected void doExecute(Task task, PublishCheckpointRequest request, ActionListener listener) { + assert false : "use PublishCheckpointAction#publish"; + } + + final void publish(IndexShard indexShard) { + String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); + long primaryTerm = indexShard.getPendingPrimaryTerm(); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + transportService.sendChildRequest( + clusterService.localNode(), + transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap( + e, + IndexNotFoundException.class, + AlreadyClosedException.class, + IndexShardClosedException.class + ) != null) { + // the index was deleted or the shard is closed + return; + } + logger.warn( + new ParameterizedMessage("{} segment replication checkpoint publishing failed", indexShard.shardId()), + e + ); + } + } + ); + } + } + + @Override + protected void shardOperationOnPrimary( + PublishCheckpointRequest request, + IndexShard primary, + ActionListener> listener + ) { + ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); + } + + @Override + protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexShard replica, ActionListener listener) { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + ActionListener.completeWith(listener, () -> { + logger.trace("Checkpoint received on replica {}", request); + if (request.getCheckpoint().getShardId().equals(replica.shardId())) { + replica.onNewCheckpoint(request, source, replicationService); + } + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java index 61a508f50d574..a51a234e3d9de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java @@ -8,19 +8,19 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.action.support.broadcast.BroadcastRequest; +import org.opensearch.action.support.replication.ReplicationRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import java.io.IOException; -public class PublishCheckpointRequest extends BroadcastRequest { +public class PublishCheckpointRequest extends ReplicationRequest { - private ReplicationCheckpoint checkpoint; + private final ReplicationCheckpoint checkpoint; - public PublishCheckpointRequest(ReplicationCheckpoint checkpoint, String... indices) { - super(indices); + public PublishCheckpointRequest(ReplicationCheckpoint checkpoint) { + super(checkpoint.getShardId()); this.checkpoint = checkpoint; } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java new file mode 100644 index 0000000000000..750213eb0c086 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.checkpoint; + +import org.opensearch.common.inject.Inject; +import org.opensearch.index.shard.IndexShard; + +import java.util.Objects; + +public class SegmentReplicationCheckpointPublisher { + + private final PublishAction publishAction; + + @Inject + public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { + this(publishAction::publish); + } + + public SegmentReplicationCheckpointPublisher(PublishAction publishAction) { + this.publishAction = Objects.requireNonNull(publishAction); + } + + public void publish(IndexShard indexShard) { + publishAction.publish(indexShard); + } + + public interface PublishAction { + void publish(IndexShard indexShard); + } + + public static final SegmentReplicationCheckpointPublisher EMPTY = new SegmentReplicationCheckpointPublisher(indexShard -> {}); +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java deleted file mode 100644 index 61bfac16812b3..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.opensearch.action.support.replication.ReplicationRequest; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.index.shard.ShardId; - -import java.io.IOException; - -public class ShardPublishCheckpointRequest extends ReplicationRequest { - - private final PublishCheckpointRequest request; - - public ShardPublishCheckpointRequest(PublishCheckpointRequest request, ShardId shardId) { - super(shardId); - this.request = request; - } - - public ShardPublishCheckpointRequest(StreamInput in) throws IOException { - super(in); - this.request = new PublishCheckpointRequest(in); - } - - PublishCheckpointRequest getRequest() { - return request; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - request.writeTo(out); - } - - @Override - public String toString() { - return "ShardPublishCheckpointRequest{" + shardId + "}"; - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java deleted file mode 100644 index b512b1d201499..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.indices.refresh.RefreshResponse; -import org.opensearch.client.Client; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; - -public class TransportCheckpointPublisher { - - protected static Logger logger = LogManager.getLogger(TransportCheckpointPublisher.class); - - private final Client client; - - public TransportCheckpointPublisher(Client client) { - this.client = client; - } - - public void publish(ReplicationCheckpoint checkpoint) { - logger.trace("Publishing Checkpoint {}", checkpoint); - client.admin().indices().publishCheckpoint(new PublishCheckpointRequest(checkpoint), new ActionListener() { - @Override - public void onResponse(RefreshResponse response) { - logger.trace("Successfully published checkpoints"); - } - - @Override - public void onFailure(Exception e) { - logger.error("Publishing Checkpoints from primary to replicas failed", e); - } - }); - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java deleted file mode 100644 index 3d66b041eae14..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.opensearch.action.admin.indices.refresh.RefreshResponse; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.DefaultShardOperationFailedException; -import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.TransportBroadcastReplicationAction; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.index.shard.ShardId; -import org.opensearch.transport.TransportService; - -import java.util.List; - -public class TransportPublishCheckpointAction extends TransportBroadcastReplicationAction< - PublishCheckpointRequest, - RefreshResponse, - ShardPublishCheckpointRequest, - ReplicationResponse> { - - @Inject - public TransportPublishCheckpointAction( - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - TransportPublishShardCheckpointAction shardCheckpointAction - ) { - super( - PublishCheckpointAction.NAME, - PublishCheckpointRequest::new, - clusterService, - transportService, - actionFilters, - indexNameExpressionResolver, - shardCheckpointAction - ); - } - - @Override - protected ReplicationResponse newShardResponse() { - return new ReplicationResponse(); - } - - @Override - protected ShardPublishCheckpointRequest newShardRequest(PublishCheckpointRequest request, ShardId shardId) { - return new ShardPublishCheckpointRequest(request, shardId); - } - - @Override - protected RefreshResponse newResponse( - int successfulShards, - int failedShards, - int totalNumCopies, - List shardFailures - ) { - return new RefreshResponse(totalNumCopies, successfulShards, failedShards, shardFailures); - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java deleted file mode 100644 index c0f55bb70f325..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionListener; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.TransportReplicationAction; -import org.opensearch.cluster.action.shard.ShardStateAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -import java.io.IOException; - -public class TransportPublishShardCheckpointAction extends TransportReplicationAction< - ShardPublishCheckpointRequest, - ShardPublishCheckpointRequest, - ReplicationResponse> { - - protected static Logger logger = LogManager.getLogger(TransportPublishShardCheckpointAction.class); - - public static final String ACTION_NAME = PublishCheckpointAction.NAME + "[s]"; - - private final SegmentReplicationReplicaService replicationService; - private final PrimaryShardReplicationSource source; - - @Inject - public TransportPublishShardCheckpointAction( - Settings settings, - TransportService transportService, - ClusterService clusterService, - IndicesService indicesService, - ThreadPool threadPool, - ShardStateAction shardStateAction, - ActionFilters actionFilters, - SegmentReplicationReplicaService segmentCopyService, - PrimaryShardReplicationSource source - ) { - super( - settings, - ACTION_NAME, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - actionFilters, - ShardPublishCheckpointRequest::new, - ShardPublishCheckpointRequest::new, - ThreadPool.Names.SNAPSHOT - ); - this.replicationService = segmentCopyService; - this.source = source; - } - - @Override - protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { - return new ReplicationResponse(in); - } - - @Override - protected void shardOperationOnPrimary( - ShardPublishCheckpointRequest shardRequest, - IndexShard primary, - ActionListener> listener - ) { - ActionListener.completeWith(listener, () -> new PrimaryResult<>(shardRequest, new ReplicationResponse())); - } - - @Override - protected void shardOperationOnReplica( - ShardPublishCheckpointRequest shardRequest, - IndexShard replica, - ActionListener listener - ) { - ActionListener.completeWith(listener, () -> { - PublishCheckpointRequest request = shardRequest.getRequest(); - logger.trace("Checkpoint received on replica {}", request); - if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request, source, replicationService); - } - // TODO: Segrep - These requests are getting routed to all shards across all indices. - // We should only publish to replicas of the updated index. - return new ReplicaResult(); - }); - } -} diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index b1b33e03c4b18..db69690f1df00 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -97,7 +97,6 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.internal.ReaderContext; @@ -196,7 +195,6 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { NoOpClient noOpClient = new NoOpClient(this.getTestName()); - TransportCheckpointPublisher checkpointPublisher = new TransportCheckpointPublisher(noOpClient); return module.newIndexService( CREATE_INDEX, nodeEnvironment, @@ -213,8 +211,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, - null, - checkpointPublisher + null ); } diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 5f3d03f85f324..0989bf869f18e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -49,6 +49,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.util.Arrays; @@ -148,7 +149,12 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); + IndexShard shard = index.createShard( + newRouting, + s -> {}, + RetentionLeaseSyncer.EMPTY, + SegmentReplicationCheckpointPublisher.EMPTY + ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode( diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 278c8c3408adc..94abc3d9fb44a 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -58,6 +58,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; @@ -257,6 +258,7 @@ public MockIndexShard createShard( final SegmentReplicationReplicaService replicaService, final SegmentReplicationReplicaService.SegmentReplicationListener segRepListener, final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 0f2ccda8291a1..bc502ec716d28 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -67,6 +67,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; @@ -578,6 +579,7 @@ private IndicesClusterStateService createIndicesClusterStateService( threadPool, replicationReplicaService, replicationSource, + SegmentReplicationCheckpointPublisher.EMPTY, recoveryTargetService, shardStateAction, null, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index ded22610afbd9..94d068d17fc50 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -183,6 +183,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; @@ -1869,7 +1870,8 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, replicaService, - new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, replicaService) + new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, replicaService), + SegmentReplicationCheckpointPublisher.EMPTY ); Map actions = new HashMap<>(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 81e3477725ca7..f2a95826b8844 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -93,7 +93,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; @@ -461,7 +461,6 @@ protected IndexShard newShard( ); // No-op checkpoint publisher for backwards compatibliity NoOpClient noOpClient = new NoOpClient(this.getTestName()); - TransportCheckpointPublisher checkpointPublisher = new TransportCheckpointPublisher(noOpClient); indexShard = new IndexShard( routing, indexSettings, @@ -483,7 +482,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - checkpointPublisher + SegmentReplicationCheckpointPublisher.EMPTY ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;