Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Feb 9, 2019
1 parent 4265a13 commit 38a5170
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
Expand Down Expand Up @@ -522,7 +521,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
actions.register(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);

actions.register(RetentionLeaseAction.INSTANCE, RetentionLeaseAction.TransportAction.class);
actions.register(AddRetentionLeaseAction.INSTANCE, AddRetentionLeaseAction.TransportAction.class);

actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.elasticsearch.action;

import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -20,17 +19,16 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class RetentionLeaseAction extends Action<RetentionLeaseAction.Response> {
public class AddRetentionLeaseAction extends Action<AddRetentionLeaseAction.Response> {

public static final RetentionLeaseAction INSTANCE = new RetentionLeaseAction();
public static final AddRetentionLeaseAction INSTANCE = new AddRetentionLeaseAction();
public static final String NAME = "indices:data/write/retention_lease";

private RetentionLeaseAction() {
private AddRetentionLeaseAction() {
super(NAME);
}

Expand Down Expand Up @@ -184,5 +182,4 @@ public Response newResponse() {
return new Response();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RetentionLeaseAction;
import org.elasticsearch.action.AddRetentionLeaseAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand All @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
Expand Down Expand Up @@ -173,8 +172,8 @@ private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Reque
ActionListener<PutFollowAction.Response> originalListener,
RestoreService.RestoreCompletionResponse response) {
clientWithHeaders.execute(
RetentionLeaseAction.INSTANCE,
new RetentionLeaseAction.Request());
AddRetentionLeaseAction.INSTANCE,
new AddRetentionLeaseAction.Request());


final ActionListener<PutFollowAction.Response> listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RetentionLeaseAction;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.AddRetentionLeaseAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand All @@ -36,7 +34,6 @@
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -288,11 +285,12 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());

final String retentionLeaseId = leaderUUID + "-" + shardId.getIndex().getUUID();

final Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
RetentionLeaseAction.Response response = remoteClient
.execute(RetentionLeaseAction.INSTANCE, new RetentionLeaseAction.Request(leaderShardId, retentionLeaseId, 0, "ccr"))

final String retentionLeaseId =
indexShard.shardId().getIndex().getUUID() + "-following-" + leaderUUID + "-" + UUIDs.randomBase64UUID();
remoteClient
.execute(AddRetentionLeaseAction.INSTANCE, new AddRetentionLeaseAction.Request(leaderShardId, retentionLeaseId, 0, "ccr"))
.actionGet(ccrSettings.getRecoveryActionTimeout());

// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
Expand Down

0 comments on commit 38a5170

Please sign in to comment.