Skip to content

Commit

Permalink
changed few naming conventions.
Browse files Browse the repository at this point in the history
1. remote to leader, 2. local to follower, etc

Signed-off-by: naveen pajjuri <[email protected]>
  • Loading branch information
naveen pajjuri authored and soosinha committed Aug 9, 2021
1 parent 2ca4f65 commit b6a299f
Show file tree
Hide file tree
Showing 53 changed files with 437 additions and 485 deletions.
12 changes: 6 additions & 6 deletions HANDBOOK.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ This API is used to initiate replication of an index from the leader cluster ont
PUT localhost:{{foll_port}}/_plugins/_replication/<index>/_start
Content-Type: application/json

{ "remote_cluster" : "leader-cluster", "remote_index": "<index>"}
{ "leader_alias" : "leader-cluster", "leader_index": "<index>"}


# RESPONSE
Expand All @@ -203,7 +203,7 @@ Content-Type: application/json
curl -k -u testuser:testuser -XPUT \
"https://${FOLLOWER}/_plugins/_replication/follower-01/_start?pretty" \
-H 'Content-type: application/json' \
-d'{"remote_cluster":"leader-cluster", "remote_index": "leader-01"}'
-d'{"leader_alias":"leader-cluster", "leader_index": "leader-01"}'

# Make sure to create an index with name leader-01 on the leader before starting replication on top of this index.
# Now there should be a ReadOnly index named 'follower-01' on the follower cluster that should continuously stay updated with changes to 'leader-01' index on the leader cluster.
Expand Down Expand Up @@ -253,7 +253,7 @@ AutoFollow API helps to automatically start replication on indices matching a pa
POST localhost:{{foll_port}}/_plugins/_replication/_autofollow
Content-Type: application/json

{"connection" : "<remote cluster connection name>", "pattern": "<index pattern>", "name": "<name to identify autofollow task>"}
{"leader_alias" : "<remote cluster connection name>", "pattern": "<index pattern>", "name": "<name to identify autofollow task>"}

# RESPONSE

Expand All @@ -267,7 +267,7 @@ Content-Type: application/json
curl -k -u testuser:testuser -XPOST \
"https://${FOLLOWER}/_plugins/_replication/_autofollow?pretty" \
-H 'Content-type: application/json' \
-d'{"connection":"leader-cluster","pattern":"leader-*", "name":"my-replication"}'
-d'{"leader_alias":"leader-cluster","pattern":"leader-*", "name":"my-replication"}'
```
## Stop AutoFollow
Expand All @@ -281,7 +281,7 @@ DELETE localhost:{{foll_port}}/_plugins/_replication/_autofollow
Content-Type: application/json

{
"connection": "leader-cluster",
"leader_alias": "leader-cluster",
"name": "test"
}
```
Expand All @@ -292,7 +292,7 @@ Content-Type: application/json
curl -k -u testuser:testuser -XDELETE \
"https://${FOLLOWER}/_plugins/_replication/_autofollow?pretty" \
-H 'Content-type: application/json' \
-d'{"connection":"leader-cluster", "name":"my-replication"}'
-d'{"leader_alias":"leader-cluster", "name":"my-replication"}'
```
## Check ongoing replication tasks
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ curl -XPOST "http://${LEADER}/leader-01/_doc/1" -H 'Content-Type: application/js
```bash
curl -XPUT "http://${FOLLOWER}/_plugins/_replication/follower-01/_start?pretty" \
-H 'Content-type: application/json' \
-d'{"remote_cluster":"leader-cluster", "remote_index": "leader-01"}'
-d'{"leader_cluster":"leader-cluster", "leader_index": "leader-01"}'
```

### Step 5: Make changes to data on leader index
Expand Down
6 changes: 3 additions & 3 deletions docs/RFC.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Request
PUT $FOLLOWER/_plugins/_replication/<index>/_start
Content-Type: application/json

{ "remote_cluster" : "leader-cluster", "remote_index": "<index>"}
{ "leader_alias" : "leader-cluster", "leader_index": "<index>"}

## Response

Expand Down Expand Up @@ -148,7 +148,7 @@ POST $FOLLOWER/_plugins/_replication/_autofollow
Content-Type: application/json

{
"connection": "leader-cluster",
"leader_alias": "leader-cluster",
"name": "test",
"pattern": "*customer*"
}
Expand All @@ -163,7 +163,7 @@ DELETE $FOLLOWER/_plugins/_replication/_autofollow
Content-Type: application/json

{
"connection": "leader-cluster",
"leader_alias": "leader-cluster",
"name": "test"
}
```
Expand Down
22 changes: 11 additions & 11 deletions replication.http
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ Content-Type: application/json


{
"remote_cluster": "remote-cluster",
"remote_index": "remote-index",
"leader_alias": "remote-cluster",
"leader_index": "leader-index",
"assume_roles": {
"remote_cluster_role": "all_access",
"local_cluster_role": "all_access"
"leader_cluster_role": "all_access",
"follower_cluster_role": "all_access"
}
}

Expand Down Expand Up @@ -97,8 +97,8 @@ Content-Type: application/json


{
"remote_cluster" : "source",
"remote_index": "customer"
"leader_alias" : "source",
"leader_index": "customer"
}

### Add another document after replication started
Expand Down Expand Up @@ -151,7 +151,7 @@ POST localhost:{{foll_port}}/_plugins/_replication/_autofollow
Content-Type: application/json

{
"connection": "source",
"leader_alias": "source",
"name": "test",
"pattern": "*customer*"
}
Expand All @@ -161,12 +161,12 @@ POST localhost:{{foll_port}}/_plugins/_replication/_autofollow
Content-Type: application/json

{
"connection": "source",
"leader_alias": "source",
"name": "test",
"pattern": "*customer*",
"assume_roles": {
"remote_cluster_role": "all_access",
"local_cluster_role": "all_access"
"leader_cluster_role": "all_access",
"follower_cluster_role": "all_access"
}
}

Expand All @@ -175,7 +175,7 @@ DELETE localhost:{{foll_port}}/_plugins/_replication/_autofollow
Content-Type: application/json

{
"connection": "source",
"leader_alias": "source",
"name": "test"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ import com.amazon.elasticsearch.replication.metadata.UpdateMetadataAction
import com.amazon.elasticsearch.replication.metadata.state.ReplicationStateMetadata
import com.amazon.elasticsearch.replication.metadata.store.ReplicationMetadataStore
import com.amazon.elasticsearch.replication.repository.REMOTE_REPOSITORY_TYPE
import com.amazon.elasticsearch.replication.repository.RemoteClusterRepositoriesService
import com.amazon.elasticsearch.replication.repository.RemoteClusterRepository
import com.amazon.elasticsearch.replication.repository.LeaderClusterRepositoriesService
import com.amazon.elasticsearch.replication.repository.LeaderClusterRepository
import com.amazon.elasticsearch.replication.repository.RemoteClusterRestoreLeaderService
import com.amazon.elasticsearch.replication.rest.PauseIndexReplicationHandler
import com.amazon.elasticsearch.replication.rest.ReplicateIndexHandler
Expand Down Expand Up @@ -182,7 +182,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
this.replicationMetadataManager = ReplicationMetadataManager(clusterService, client,
ReplicationMetadataStore(client, clusterService, xContentRegistry))
this.replicationSettings = ReplicationSettings(clusterService)
return listOf(RemoteClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings)
return listOf(LeaderClusterRepositoriesService(repositoriesService, clusterService), replicationMetadataManager, replicationSettings)
}

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent>> {
Expand Down Expand Up @@ -322,7 +322,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
clusterService: ClusterService, recoverySettings: RecoverySettings): Map<String, Repository.Factory> {
val repoFactory = Repository.Factory { repoMetadata: RepositoryMetadata ->
RemoteClusterRepository(repoMetadata, client, clusterService, recoverySettings, replicationMetadataManager, replicationSettings) }
LeaderClusterRepository(repoMetadata, client, clusterService, recoverySettings, replicationMetadataManager, replicationSettings) }
return mapOf(REMOTE_REPOSITORY_TYPE to repoFactory)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ class TransportAutoFollowMasterNodeAction @Inject constructor(transportService:
throw ReplicationException("Failed to update empty autofollow pattern")
}
// Pattern is same for leader and follower
val followerFgacRole = request.assumeRoles?.get(ReplicateIndexRequest.FOLLOWER_FGAC_ROLE)
val leaderFgacRole = request.assumeRoles?.get(ReplicateIndexRequest.LEADER_FGAC_ROLE)
val followerClusterRole = request.assumeRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE)
val leaderClusterRole = request.assumeRoles?.get(ReplicateIndexRequest.LEADER_CLUSTER_ROLE)

indexScopedSettings.validate(request.settings,
false,
false)

metadataManager.addAutofollowMetadata(request.patternName, request.connection, request.pattern!!,
ReplicationOverallState.RUNNING, user, followerFgacRole, leaderFgacRole, request.settings)
ReplicationOverallState.RUNNING, user, followerClusterRole, leaderClusterRole, request.settings)
startAutoFollowTask(request.connection, request.patternName)
}
AcknowledgedResponse(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,25 @@
package com.amazon.elasticsearch.replication.action.autofollow

import com.amazon.elasticsearch.replication.ReplicationException
import com.amazon.elasticsearch.replication.action.index.ReplicateIndexAction
import com.amazon.elasticsearch.replication.action.index.ReplicateIndexRequest
import com.amazon.elasticsearch.replication.action.index.ReplicateIndexResponse
import com.amazon.elasticsearch.replication.action.index.TransportReplicateIndexAction
import com.amazon.elasticsearch.replication.action.setup.SetupChecksAction
import com.amazon.elasticsearch.replication.action.setup.SetupChecksRequest
import com.amazon.elasticsearch.replication.action.setup.ValidatePermissionsRequest
import com.amazon.elasticsearch.replication.metadata.ReplicationMetadataManager
import com.amazon.elasticsearch.replication.metadata.ReplicationOverallState
import com.amazon.elasticsearch.replication.metadata.store.ReplicationContext
import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowExecutor
import com.amazon.elasticsearch.replication.task.autofollow.AutoFollowParams
import com.amazon.elasticsearch.replication.util.SecurityContext
import com.amazon.elasticsearch.replication.util.completeWith
import com.amazon.elasticsearch.replication.util.coroutineContext
import com.amazon.elasticsearch.replication.util.overrideFgacRole
import com.amazon.elasticsearch.replication.util.persistentTasksService
import com.amazon.elasticsearch.replication.util.removeTask
import com.amazon.elasticsearch.replication.util.startTask
import com.amazon.elasticsearch.replication.util.suspendExecute
import com.amazon.opendistroforelasticsearch.commons.authuser.User
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.ResourceNotFoundException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.action.support.master.TransportMasterNodeAction
import org.elasticsearch.client.Client
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.block.ClusterBlockLevel
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.tasks.Task
import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportService
Expand All @@ -81,10 +58,10 @@ class TransportUpdateAutoFollowPatternAction @Inject constructor(transportServic
listener.completeWith {
if (request.action == UpdateAutoFollowPatternRequest.Action.ADD) {
// Pattern is same for leader and follower
val followerFgacRole = request.assumeRoles?.get(ReplicateIndexRequest.FOLLOWER_FGAC_ROLE)
val leaderFgacRole = request.assumeRoles?.get(ReplicateIndexRequest.LEADER_FGAC_ROLE)
val setupChecksRequest = SetupChecksRequest(ReplicationContext(request.pattern!!, user?.overrideFgacRole(followerFgacRole)),
ReplicationContext(request.pattern!!, user?.overrideFgacRole(leaderFgacRole)),
val followerClusterRole = request.assumeRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE)
val leaderClusterRole = request.assumeRoles?.get(ReplicateIndexRequest.LEADER_CLUSTER_ROLE)
val setupChecksRequest = SetupChecksRequest(ReplicationContext(request.pattern!!, user?.overrideFgacRole(followerClusterRole)),
ReplicationContext(request.pattern!!, user?.overrideFgacRole(leaderClusterRole)),
request.connection)
val setupChecksRes = client.suspendExecute(SetupChecksAction.INSTANCE, setupChecksRequest)
if(!setupChecksRes.isAcknowledged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class UpdateAutoFollowPatternRequest: AcknowledgedRequest<UpdateAutoFollowPatter
companion object {
private val AUTOFOLLOW_REQ_PARSER = ObjectParser<UpdateAutoFollowPatternRequest, Void>("AutoFollowRequestParser") { UpdateAutoFollowPatternRequest() }
init {
AUTOFOLLOW_REQ_PARSER.declareString(UpdateAutoFollowPatternRequest::connection::set, ParseField("connection"))
AUTOFOLLOW_REQ_PARSER.declareString(UpdateAutoFollowPatternRequest::connection::set, ParseField("leader_alias"))
AUTOFOLLOW_REQ_PARSER.declareString(UpdateAutoFollowPatternRequest::patternName::set, ParseField("name"))
AUTOFOLLOW_REQ_PARSER.declareString(UpdateAutoFollowPatternRequest::pattern::set, ParseField("pattern"))

Expand Down Expand Up @@ -88,11 +88,11 @@ class UpdateAutoFollowPatternRequest: AcknowledgedRequest<UpdateAutoFollowPatter
patternName = inp.readString()
pattern = inp.readOptionalString()
action = inp.readEnum(Action::class.java)
var leaderFgacRole = inp.readOptionalString()
var followerFgacRole = inp.readOptionalString()
var leaderClusterRole = inp.readOptionalString()
var followerClusterRole = inp.readOptionalString()
assumeRoles = HashMap()
if(leaderFgacRole != null) assumeRoles!![ReplicateIndexRequest.LEADER_FGAC_ROLE] = leaderFgacRole
if(followerFgacRole != null) assumeRoles!![ReplicateIndexRequest.FOLLOWER_FGAC_ROLE] = followerFgacRole
if(leaderClusterRole != null) assumeRoles!![ReplicateIndexRequest.LEADER_CLUSTER_ROLE] = leaderClusterRole
if(followerClusterRole != null) assumeRoles!![ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE] = followerClusterRole
settings = Settings.readSettingsFromStream(inp)
}

Expand All @@ -105,10 +105,10 @@ class UpdateAutoFollowPatternRequest: AcknowledgedRequest<UpdateAutoFollowPatter
validationException.addValidationError("Missing connection or name in the request")
}

if(assumeRoles != null && (assumeRoles!!.size < 2 || assumeRoles!![ReplicateIndexRequest.LEADER_FGAC_ROLE] == null ||
assumeRoles!![ReplicateIndexRequest.FOLLOWER_FGAC_ROLE] == null)) {
validationException.addValidationError("Need roles for ${ReplicateIndexRequest.LEADER_FGAC_ROLE} and " +
"${ReplicateIndexRequest.FOLLOWER_FGAC_ROLE}")
if(assumeRoles != null && (assumeRoles!!.size < 2 || assumeRoles!![ReplicateIndexRequest.LEADER_CLUSTER_ROLE] == null ||
assumeRoles!![ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE] == null)) {
validationException.addValidationError("Need roles for ${ReplicateIndexRequest.LEADER_CLUSTER_ROLE} and " +
"${ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE}")
}

if(action == Action.REMOVE) {
Expand All @@ -128,22 +128,22 @@ class UpdateAutoFollowPatternRequest: AcknowledgedRequest<UpdateAutoFollowPatter
out.writeString(patternName)
out.writeOptionalString(pattern)
out.writeEnum(action)
out.writeOptionalString(assumeRoles?.get(ReplicateIndexRequest.LEADER_FGAC_ROLE))
out.writeOptionalString(assumeRoles?.get(ReplicateIndexRequest.FOLLOWER_FGAC_ROLE))
out.writeOptionalString(assumeRoles?.get(ReplicateIndexRequest.LEADER_CLUSTER_ROLE))
out.writeOptionalString(assumeRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE))
Settings.writeSettingsToStream(settings, out)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("connection", connection)
builder.field("leader_alias", connection)
builder.field("pattern_name", patternName)
builder.field("pattern", pattern)
builder.field("action", action.name)
if(assumeRoles != null) {
builder.field("assume_roles")
builder.startObject()
builder.field("remote_fgac_role", assumeRoles!!.get(ReplicateIndexRequest.LEADER_FGAC_ROLE))
builder.field("local_fgac_role", assumeRoles!!.get(ReplicateIndexRequest.FOLLOWER_FGAC_ROLE))
builder.field("leader_cluster_role", assumeRoles!!.get(ReplicateIndexRequest.LEADER_CLUSTER_ROLE))
builder.field("follower_cluster_role", assumeRoles!!.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE))
builder.endObject()
}

Expand Down
Loading

0 comments on commit b6a299f

Please sign in to comment.