Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.10] Fix CCR compatibility with remote translogs #1277

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,14 @@ import java.util.Optional
import java.util.function.Supplier

import org.opensearch.index.engine.NRTReplicationEngine
import org.opensearch.replication.util.ValidationUtil


@OpenForTesting
internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, RepositoryPlugin, EnginePlugin {

private lateinit var client: Client
private lateinit var clusterService: ClusterService
private lateinit var threadPool: ThreadPool
private lateinit var replicationMetadataManager: ReplicationMetadataManager
private lateinit var replicationSettings: ReplicationSettings
Expand Down Expand Up @@ -207,6 +209,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
repositoriesService: Supplier<RepositoriesService>): Collection<Any> {
this.client = client
this.threadPool = threadPool
this.clusterService = clusterService
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
Expand Down Expand Up @@ -379,9 +382,15 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
}

override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
return Optional.of(TranslogDeletionPolicyFactory{
indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
} else {
Optional.empty()
}
}

override fun onIndexModule(indexModule: IndexModule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.shard.IndexShard
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_EXECUTOR_NAME_LEADER
import org.opensearch.replication.seqno.RemoteClusterStats
import org.opensearch.replication.seqno.RemoteClusterTranslogService
import org.opensearch.replication.seqno.RemoteShardMetric
import org.opensearch.replication.util.completeWith
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.waitForGlobalCheckpoint
import org.opensearch.replication.util.*
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportActionProxy
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -79,7 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -88,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (indexShard.lastSyncedGlobalCheckpoint < request.fromSeqNo) {
assert(gcp > indexShard.lastSyncedGlobalCheckpoint) { "Checkpoint didn't advance at all" }
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(indexShard.lastSyncedGlobalCheckpoint, request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId)
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -137,12 +136,22 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }

GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, indexShard.lastSyncedGlobalCheckpoint)
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
indexShard.lastKnownGlobalCheckpoint
} else {
indexShard.lastSyncedGlobalCheckpoint
}
}


private fun isTranslogPruningByRetentionLeaseEnabled(shardId: ShardId): Boolean {
val enabled = clusterService.state().metadata.indices.get(shardId.indexName)
Expand All @@ -162,7 +171,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
}

override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return state.routingTable().shardRoutingTable(request.request().shardId).activeInitializingShardsRandomIt()
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata

// Remove translog pruning for the follower index
builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)
builder.remove(IndexMetadata.SETTING_REMOTE_STORE_ENABLED)
builder.remove(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY)
builder.remove(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.values.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -154,4 +157,8 @@ object ValidationUtil {

}

fun isRemoteStoreEnabledCluster(clusterService: ClusterService): Boolean {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

}
Loading