diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index e50e04cf94a6b..7b3baf996f47a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -84,7 +84,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.DummyShardLock; @@ -675,7 +675,7 @@ public final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - new TransportCheckpointPublisher(client()) + SegmentReplicationCheckpointPublisher.EMPTY ); } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 752f02d03cb16..8e31aa23d88cf 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -268,8 +268,6 @@ import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; -import org.opensearch.indices.replication.checkpoint.TransportPublishCheckpointAction; import org.opensearch.persistent.CompletionPersistentTaskAction; import org.opensearch.persistent.RemovePersistentTaskAction; import org.opensearch.persistent.StartPersistentTaskAction; @@ -588,7 +586,6 @@ public void reg actions.register(SimulateTemplateAction.INSTANCE, TransportSimulateTemplateAction.class); actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class); actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class); - actions.register(PublishCheckpointAction.INSTANCE, TransportPublishCheckpointAction.class); actions.register(FlushAction.INSTANCE, TransportFlushAction.class); actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class); actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class); diff --git a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java index 2710035dce906..7f51b8af19e4b 100644 --- a/server/src/main/java/org/opensearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/opensearch/client/IndicesAdminClient.java @@ -132,7 +132,6 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.metadata.IndexMetadata.APIBlock; import org.opensearch.common.Nullable; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; /** * Administrative actions/operations against indices. @@ -403,13 +402,6 @@ public interface IndicesAdminClient extends OpenSearchClient { */ void refresh(RefreshRequest request, ActionListener listener); - /** - * Publish the latest primary checkpoint to replica shards. - * @param request {@link PublishCheckpointRequest} The PublishCheckpointRequest - * @param listener A listener to be notified with a result - */ - void publishCheckpoint(PublishCheckpointRequest request, ActionListener listener); - /** * Explicitly refresh one or more indices (making the content indexed since the last refresh searchable). */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 79ad5be58cb78..a37d293ee5dd2 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -368,8 +368,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; @@ -1610,11 +1608,6 @@ public void refresh(final RefreshRequest request, final ActionListener listener) { - execute(PublishCheckpointAction.INSTANCE, request, listener); - } - @Override public RefreshRequestBuilder prepareRefresh(String... indices) { return new RefreshRequestBuilder(this, RefreshAction.INSTANCE).setIndices(indices); diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index d2f53fe28e811..61f6c9689308f 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -75,7 +75,6 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -467,8 +466,7 @@ public IndexService newIndexService( IndicesFieldDataCache indicesFieldDataCache, NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, - ValuesSourceRegistry valuesSourceRegistry, - TransportCheckpointPublisher checkpointPublisher + ValuesSourceRegistry valuesSourceRegistry ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -522,8 +520,7 @@ public IndexService newIndexService( allowExpensiveQueries, expressionResolver, valuesSourceRegistry, - recoveryStateFactory, - checkpointPublisher + recoveryStateFactory ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 9bf0c8da9ccc8..fdbd0a4cf8471 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -94,7 +94,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; @@ -166,7 +166,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexNameExpressionResolver expressionResolver; private final Supplier indexSortSupplier; private final ValuesSourceRegistry valuesSourceRegistry; - private final TransportCheckpointPublisher checkpointPublisher; public IndexService( IndexSettings indexSettings, @@ -197,8 +196,7 @@ public IndexService( BooleanSupplier allowExpensiveQueries, IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, - TransportCheckpointPublisher checkpointPublisher + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -209,7 +207,6 @@ public IndexService( this.circuitBreakerService = circuitBreakerService; this.expressionResolver = expressionResolver; this.valuesSourceRegistry = valuesSourceRegistry; - this.checkpointPublisher = checkpointPublisher; if (needsMapperService(indexSettings, indexCreationContext)) { assert indexAnalyzers != null; this.mapperService = new MapperService( @@ -422,7 +419,8 @@ private long getAvgShardSizeInBytes() throws IOException { public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 1c92396c167ad..ecfc79a31a15f 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -11,7 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.ReferenceManager; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import java.io.IOException; @@ -24,9 +24,9 @@ public class CheckpointRefreshListener implements ReferenceManager.RefreshListen protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); private final IndexShard shard; - private final TransportCheckpointPublisher publisher; + private final SegmentReplicationCheckpointPublisher publisher; - public CheckpointRefreshListener(IndexShard shard, TransportCheckpointPublisher publisher) { + public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) { this.shard = shard; this.publisher = publisher; } @@ -38,8 +38,8 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (shard.routingEntry().primary()) { - publisher.publish(shard.getLatestReplicationCheckpoint()); + if (didRefresh) { + publisher.publish(shard); } } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a792a8914c817..6ae827b553519 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -161,7 +161,7 @@ import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationFailedException; @@ -327,7 +327,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final TransportCheckpointPublisher checkpointPublisher + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 50ead5519d574..5efab2d3633ab 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -73,7 +73,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.mapper.MapperRegistry; -import org.opensearch.indices.replication.checkpoint.TransportPublishShardCheckpointAction; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.store.IndicesStore; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.plugins.MapperPlugin; @@ -276,8 +276,8 @@ protected void configure() { bind(PrimaryReplicaSyncer.class).asEagerSingleton(); bind(RetentionLeaseSyncAction.class).asEagerSingleton(); bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); - bind(TransportPublishShardCheckpointAction.class).asEagerSingleton(); bind(RetentionLeaseSyncer.class).asEagerSingleton(); + bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b85a65c7e67be..4d5b22865a2a0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -138,7 +138,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; @@ -740,8 +740,7 @@ private synchronized IndexService createIndexService( indicesFieldDataCache, namedWriteableRegistry, this::isIdFieldDataEnabled, - valuesSourceRegistry, - new TransportCheckpointPublisher(client) + valuesSourceRegistry ); } @@ -840,6 +839,7 @@ public IndexShard createShard( final SegmentReplicationReplicaService segmentReplicationReplicaService, final SegmentReplicationReplicaService.SegmentReplicationListener segRepListener, final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, @@ -850,11 +850,12 @@ public IndexShard createShard( final DiscoveryNode sourceNode ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); + Objects.requireNonNull(checkpointPublisher); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery( recoveryState, diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e09add708b14c..019329ecb2801 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -81,6 +81,7 @@ import org.opensearch.indices.recovery.RecoveryFailedException; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.SegmentReplicationState; @@ -138,6 +139,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; + private final SegmentReplicationCheckpointPublisher checkpointPublisher; @Inject public IndicesClusterStateService( @@ -156,7 +158,8 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationReplicaService replicationReplicaService, - final PrimaryShardReplicationSource replicationSource + final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher ) { this( settings, @@ -165,6 +168,7 @@ public IndicesClusterStateService( threadPool, replicationReplicaService, replicationSource, + checkpointPublisher, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, @@ -186,6 +190,7 @@ public IndicesClusterStateService( final ThreadPool threadPool, final SegmentReplicationReplicaService replicationReplicaService, final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, @@ -200,6 +205,7 @@ public IndicesClusterStateService( this.settings = settings; this.segmentReplicationReplicaService = replicationReplicaService; this.replicationSource = replicationSource; + this.checkpointPublisher = checkpointPublisher; this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; @@ -632,6 +638,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR segmentReplicationReplicaService, new ShardRoutingReplicationListener(shardRouting, primaryTerm), replicationSource, + checkpointPublisher, recoveryTargetService, new ShardRoutingRecoveryListener(shardRouting, primaryTerm), repositoriesService, @@ -1030,6 +1037,7 @@ T createShard( SegmentReplicationReplicaService replicaService, SegmentReplicationReplicaService.SegmentReplicationListener segRepListener, PrimaryShardReplicationSource replicationSource, + SegmentReplicationCheckpointPublisher checkpointPublisher, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index ece59432f4a37..b655edd8c6d79 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -8,14 +8,166 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.action.ActionType; -import org.opensearch.action.admin.indices.refresh.RefreshResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.action.support.replication.ReplicationTask; +import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; +import org.opensearch.node.NodeClosedException; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; -public class PublishCheckpointAction extends ActionType { - public static final PublishCheckpointAction INSTANCE = new PublishCheckpointAction(); - public static final String NAME = "indices:admin/publishCheckpoint"; +import java.io.IOException; +import java.util.Objects; - private PublishCheckpointAction() { - super(NAME, RefreshResponse::new); +public class PublishCheckpointAction extends TransportReplicationAction< + PublishCheckpointRequest, + PublishCheckpointRequest, + ReplicationResponse> { + + public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; + protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + + private final SegmentReplicationReplicaService replicationService; + private final PrimaryShardReplicationSource source; + + @Inject + public PublishCheckpointAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + SegmentReplicationReplicaService segmentCopyService, + PrimaryShardReplicationSource source + ) { + super( + settings, + ACTION_NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + PublishCheckpointRequest::new, + PublishCheckpointRequest::new, + ThreadPool.Names.REFRESH + ); + this.replicationService = segmentCopyService; + this.source = source; + } + + @Override + protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { + return new ReplicationResponse(in); + } + + @Override + protected void doExecute(Task task, PublishCheckpointRequest request, ActionListener listener) { + assert false : "use PublishCheckpointAction#publish"; + } + + final void publish(IndexShard indexShard) { + String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); + long primaryTerm = indexShard.getPendingPrimaryTerm(); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint()); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request); + transportService.sendChildRequest( + clusterService.localNode(), + transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap( + e, + IndexNotFoundException.class, + AlreadyClosedException.class, + IndexShardClosedException.class + ) != null) { + // the index was deleted or the shard is closed + return; + } + logger.warn( + new ParameterizedMessage("{} segment replication checkpoint publishing failed", indexShard.shardId()), + e + ); + } + } + ); + } + } + + @Override + protected void shardOperationOnPrimary( + PublishCheckpointRequest request, + IndexShard primary, + ActionListener> listener + ) { + ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse())); + } + + @Override + protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexShard replica, ActionListener listener) { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + ActionListener.completeWith(listener, () -> { + logger.trace("Checkpoint received on replica {}", request); + if (request.getCheckpoint().getShardId().equals(replica.shardId())) { + replica.onNewCheckpoint(request, source, replicationService); + } + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java index 61a508f50d574..a51a234e3d9de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointRequest.java @@ -8,19 +8,19 @@ package org.opensearch.indices.replication.checkpoint; -import org.opensearch.action.support.broadcast.BroadcastRequest; +import org.opensearch.action.support.replication.ReplicationRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import java.io.IOException; -public class PublishCheckpointRequest extends BroadcastRequest { +public class PublishCheckpointRequest extends ReplicationRequest { - private ReplicationCheckpoint checkpoint; + private final ReplicationCheckpoint checkpoint; - public PublishCheckpointRequest(ReplicationCheckpoint checkpoint, String... indices) { - super(indices); + public PublishCheckpointRequest(ReplicationCheckpoint checkpoint) { + super(checkpoint.getShardId()); this.checkpoint = checkpoint; } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java new file mode 100644 index 0000000000000..750213eb0c086 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.checkpoint; + +import org.opensearch.common.inject.Inject; +import org.opensearch.index.shard.IndexShard; + +import java.util.Objects; + +public class SegmentReplicationCheckpointPublisher { + + private final PublishAction publishAction; + + @Inject + public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { + this(publishAction::publish); + } + + public SegmentReplicationCheckpointPublisher(PublishAction publishAction) { + this.publishAction = Objects.requireNonNull(publishAction); + } + + public void publish(IndexShard indexShard) { + publishAction.publish(indexShard); + } + + public interface PublishAction { + void publish(IndexShard indexShard); + } + + public static final SegmentReplicationCheckpointPublisher EMPTY = new SegmentReplicationCheckpointPublisher(indexShard -> {}); +} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java deleted file mode 100644 index 61bfac16812b3..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ShardPublishCheckpointRequest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.opensearch.action.support.replication.ReplicationRequest; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.index.shard.ShardId; - -import java.io.IOException; - -public class ShardPublishCheckpointRequest extends ReplicationRequest { - - private final PublishCheckpointRequest request; - - public ShardPublishCheckpointRequest(PublishCheckpointRequest request, ShardId shardId) { - super(shardId); - this.request = request; - } - - public ShardPublishCheckpointRequest(StreamInput in) throws IOException { - super(in); - this.request = new PublishCheckpointRequest(in); - } - - PublishCheckpointRequest getRequest() { - return request; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - request.writeTo(out); - } - - @Override - public String toString() { - return "ShardPublishCheckpointRequest{" + shardId + "}"; - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java deleted file mode 100644 index b512b1d201499..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointPublisher.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.indices.refresh.RefreshResponse; -import org.opensearch.client.Client; -import org.opensearch.indices.replication.copy.ReplicationCheckpoint; - -public class TransportCheckpointPublisher { - - protected static Logger logger = LogManager.getLogger(TransportCheckpointPublisher.class); - - private final Client client; - - public TransportCheckpointPublisher(Client client) { - this.client = client; - } - - public void publish(ReplicationCheckpoint checkpoint) { - logger.trace("Publishing Checkpoint {}", checkpoint); - client.admin().indices().publishCheckpoint(new PublishCheckpointRequest(checkpoint), new ActionListener() { - @Override - public void onResponse(RefreshResponse response) { - logger.trace("Successfully published checkpoints"); - } - - @Override - public void onFailure(Exception e) { - logger.error("Publishing Checkpoints from primary to replicas failed", e); - } - }); - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java deleted file mode 100644 index 3d66b041eae14..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishCheckpointAction.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.opensearch.action.admin.indices.refresh.RefreshResponse; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.DefaultShardOperationFailedException; -import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.TransportBroadcastReplicationAction; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.index.shard.ShardId; -import org.opensearch.transport.TransportService; - -import java.util.List; - -public class TransportPublishCheckpointAction extends TransportBroadcastReplicationAction< - PublishCheckpointRequest, - RefreshResponse, - ShardPublishCheckpointRequest, - ReplicationResponse> { - - @Inject - public TransportPublishCheckpointAction( - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, - TransportPublishShardCheckpointAction shardCheckpointAction - ) { - super( - PublishCheckpointAction.NAME, - PublishCheckpointRequest::new, - clusterService, - transportService, - actionFilters, - indexNameExpressionResolver, - shardCheckpointAction - ); - } - - @Override - protected ReplicationResponse newShardResponse() { - return new ReplicationResponse(); - } - - @Override - protected ShardPublishCheckpointRequest newShardRequest(PublishCheckpointRequest request, ShardId shardId) { - return new ShardPublishCheckpointRequest(request, shardId); - } - - @Override - protected RefreshResponse newResponse( - int successfulShards, - int failedShards, - int totalNumCopies, - List shardFailures - ) { - return new RefreshResponse(totalNumCopies, successfulShards, failedShards, shardFailures); - } -} diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java deleted file mode 100644 index c0f55bb70f325..0000000000000 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportPublishShardCheckpointAction.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication.checkpoint; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionListener; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.action.support.replication.TransportReplicationAction; -import org.opensearch.cluster.action.shard.ShardStateAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.SegmentReplicationReplicaService; -import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -import java.io.IOException; - -public class TransportPublishShardCheckpointAction extends TransportReplicationAction< - ShardPublishCheckpointRequest, - ShardPublishCheckpointRequest, - ReplicationResponse> { - - protected static Logger logger = LogManager.getLogger(TransportPublishShardCheckpointAction.class); - - public static final String ACTION_NAME = PublishCheckpointAction.NAME + "[s]"; - - private final SegmentReplicationReplicaService replicationService; - private final PrimaryShardReplicationSource source; - - @Inject - public TransportPublishShardCheckpointAction( - Settings settings, - TransportService transportService, - ClusterService clusterService, - IndicesService indicesService, - ThreadPool threadPool, - ShardStateAction shardStateAction, - ActionFilters actionFilters, - SegmentReplicationReplicaService segmentCopyService, - PrimaryShardReplicationSource source - ) { - super( - settings, - ACTION_NAME, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - actionFilters, - ShardPublishCheckpointRequest::new, - ShardPublishCheckpointRequest::new, - ThreadPool.Names.SNAPSHOT - ); - this.replicationService = segmentCopyService; - this.source = source; - } - - @Override - protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException { - return new ReplicationResponse(in); - } - - @Override - protected void shardOperationOnPrimary( - ShardPublishCheckpointRequest shardRequest, - IndexShard primary, - ActionListener> listener - ) { - ActionListener.completeWith(listener, () -> new PrimaryResult<>(shardRequest, new ReplicationResponse())); - } - - @Override - protected void shardOperationOnReplica( - ShardPublishCheckpointRequest shardRequest, - IndexShard replica, - ActionListener listener - ) { - ActionListener.completeWith(listener, () -> { - PublishCheckpointRequest request = shardRequest.getRequest(); - logger.trace("Checkpoint received on replica {}", request); - if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request, source, replicationService); - } - // TODO: Segrep - These requests are getting routed to all shards across all indices. - // We should only publish to replicas of the updated index. - return new ReplicaResult(); - }); - } -} diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index b1b33e03c4b18..db69690f1df00 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -97,7 +97,6 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; import org.opensearch.search.internal.ReaderContext; @@ -196,7 +195,6 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { NoOpClient noOpClient = new NoOpClient(this.getTestName()); - TransportCheckpointPublisher checkpointPublisher = new TransportCheckpointPublisher(noOpClient); return module.newIndexService( CREATE_INDEX, nodeEnvironment, @@ -213,8 +211,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, - null, - checkpointPublisher + null ); } diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 5f3d03f85f324..0989bf869f18e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -49,6 +49,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.util.Arrays; @@ -148,7 +149,12 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); + IndexShard shard = index.createShard( + newRouting, + s -> {}, + RetentionLeaseSyncer.EMPTY, + SegmentReplicationCheckpointPublisher.EMPTY + ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode( diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 278c8c3408adc..94abc3d9fb44a 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -58,6 +58,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; @@ -257,6 +258,7 @@ public MockIndexShard createShard( final SegmentReplicationReplicaService replicaService, final SegmentReplicationReplicaService.SegmentReplicationListener segRepListener, final PrimaryShardReplicationSource replicationSource, + final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 0f2ccda8291a1..bc502ec716d28 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -67,6 +67,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; @@ -578,6 +579,7 @@ private IndicesClusterStateService createIndicesClusterStateService( threadPool, replicationReplicaService, replicationSource, + SegmentReplicationCheckpointPublisher.EMPTY, recoveryTargetService, shardStateAction, null, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index ded22610afbd9..94d068d17fc50 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -183,6 +183,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.SegmentReplicationReplicaService; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; @@ -1869,7 +1870,8 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, replicaService, - new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, replicaService) + new PrimaryShardReplicationSource(transportService, clusterService, indicesService, recoverySettings, replicaService), + SegmentReplicationCheckpointPublisher.EMPTY ); Map actions = new HashMap<>(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 81e3477725ca7..f2a95826b8844 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -93,7 +93,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; -import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; @@ -461,7 +461,6 @@ protected IndexShard newShard( ); // No-op checkpoint publisher for backwards compatibliity NoOpClient noOpClient = new NoOpClient(this.getTestName()); - TransportCheckpointPublisher checkpointPublisher = new TransportCheckpointPublisher(noOpClient); indexShard = new IndexShard( routing, indexSettings, @@ -483,7 +482,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - checkpointPublisher + SegmentReplicationCheckpointPublisher.EMPTY ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;