From b1ee4fd793111823d47ad59a6d81fa4afe883da9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 12 Dec 2018 11:47:11 -0700 Subject: [PATCH 01/14] Bring over start session work --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 8 + .../DeleteCcrRestoreSessionAction.java | 85 ++++++++++ .../DeleteCcrRestoreSessionRequest.java | 40 +++++ .../PutCcrRestoreSessionAction.java | 159 ++++++++++++++++++ .../PutCcrRestoreSessionRequest.java | 49 ++++++ .../repository/CcrRestoreSourceService.java | 59 +++++++ 6 files changed, 400 insertions(+) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b25bd71c67ffc..3b270ad11362a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -58,10 +58,13 @@ import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; @@ -156,6 +159,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, + new CcrRestoreSourceService(settings), new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker) ); } @@ -182,6 +186,10 @@ public List> getPersistentTasksExecutor(ClusterServic PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class), new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE, DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class), + new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, + PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class), + new ActionHandler<>(DeleteCcrRestoreSessionAction.INSTANCE, + DeleteCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java new file mode 100644 index 0000000000000..6413889cd22ad --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; + +import java.io.IOException; + +public class DeleteCcrRestoreSessionAction extends Action { + + public static final DeleteCcrRestoreSessionAction INSTANCE = new DeleteCcrRestoreSessionAction(); + private static final String NAME = "internal:admin/ccr/restore/session/delete"; + + protected DeleteCcrRestoreSessionAction() { + super(NAME); + } + + @Override + public DeleteCcrRestoreSessionResponse newResponse() { + return new DeleteCcrRestoreSessionResponse(); + } + + public static class TransportDeleteCcrRestoreSessionAction + extends TransportSingleShardAction { + + private final IndicesService indicesService; + private final CcrRestoreSourceService ccrRestoreService; + + @Inject + public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, + IndexNameExpressionResolver resolver, TransportService transportService, + IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, DeleteCcrRestoreSessionRequest::new, + ThreadPool.Names.GENERIC); + this.indicesService = indicesService; + this.ccrRestoreService = ccrRestoreService; + } + + @Override + protected DeleteCcrRestoreSessionResponse shardOperation(DeleteCcrRestoreSessionRequest request, ShardId shardId) throws IOException { + // TODO: Implement + return new DeleteCcrRestoreSessionResponse(); + } + + @Override + protected DeleteCcrRestoreSessionResponse newResponse() { + return new DeleteCcrRestoreSessionResponse(); + } + + @Override + protected boolean resolveIndex(DeleteCcrRestoreSessionRequest request) { + return false; + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + return null; + } + } + + public static class DeleteCcrRestoreSessionResponse extends ActionResponse { + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java new file mode 100644 index 0000000000000..d80099912a990 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +public class DeleteCcrRestoreSessionRequest extends SingleShardRequest { + + public DeleteCcrRestoreSessionRequest() { + } + + public DeleteCcrRestoreSessionRequest(ShardId shardId) { + super(shardId.getIndexName()); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java new file mode 100644 index 0000000000000..f2ed0e916cc82 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; + +import java.io.IOException; + +public class PutCcrRestoreSessionAction extends Action { + + public static final PutCcrRestoreSessionAction INSTANCE = new PutCcrRestoreSessionAction(); + private static final String NAME = "internal:admin/ccr/restore/session/put"; + + private PutCcrRestoreSessionAction() { + super(NAME); + } + + @Override + public PutCcrRestoreSessionResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse::new; + } + + public static class TransportPutCcrRestoreSessionAction + extends TransportSingleShardAction { + + private final IndicesService indicesService; + private final CcrRestoreSourceService ccrRestoreService; + + @Inject + public TransportPutCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, + IndexNameExpressionResolver resolver, TransportService transportService, + IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, PutCcrRestoreSessionRequest::new, + ThreadPool.Names.GENERIC); + this.indicesService = indicesService; + this.ccrRestoreService = ccrRestoreService; + } + + @Override + protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionRequest request, ShardId shardId) throws IOException { + IndexShard indexShard = indicesService.getShardOrNull(shardId); + if (indexShard == null) { + throw new ShardNotFoundException(shardId); + } + Engine.IndexCommitRef commit = indexShard.acquireSafeIndexCommit(); + String sessionUUID = UUIDs.randomBase64UUID(); + ccrRestoreService.addCommit(sessionUUID, commit); + final Store.MetadataSnapshot snapshot; + indexShard.store().incRef(); + try { + snapshot = indexShard.store().getMetadata(commit.getIndexCommit()); + } finally { + indexShard.store().decRef(); + } + return new PutCcrRestoreSessionResponse(sessionUUID, indexShard.routingEntry().currentNodeId(), snapshot); + } + + @Override + protected PutCcrRestoreSessionResponse newResponse() { + return new PutCcrRestoreSessionResponse(); + } + + @Override + protected boolean resolveIndex(PutCcrRestoreSessionRequest request) { + return false; + } + + @Override + protected ShardsIterator shards(ClusterState state, InternalRequest request) { + final ShardId shardId = request.request().getShardId(); + // The index uuid is not correct if we restore with a rename + IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId.getIndexName(), shardId.id()); + return shardRoutingTable.primaryShardIt(); + } + } + + + public static class PutCcrRestoreSessionResponse extends ActionResponse { + + private String sessionUUID; + private String nodeId; + private Store.MetadataSnapshot metaData; + + PutCcrRestoreSessionResponse() { + } + + PutCcrRestoreSessionResponse(String sessionUUID, String nodeId, Store.MetadataSnapshot metaData) { + this.sessionUUID = sessionUUID; + this.nodeId = nodeId; + this.metaData = metaData; + } + + PutCcrRestoreSessionResponse(StreamInput streamInput) throws IOException { + super(streamInput); + sessionUUID = streamInput.readString(); + nodeId = streamInput.readString(); + metaData = new Store.MetadataSnapshot(streamInput); + } + + @Override + public void readFrom(StreamInput streamInput) throws IOException { + super.readFrom(streamInput); + sessionUUID = streamInput.readString(); + nodeId = streamInput.readString(); + metaData = new Store.MetadataSnapshot(streamInput); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); + streamOutput.writeString(sessionUUID); + streamOutput.writeString(nodeId); + metaData.writeTo(streamOutput); + } + + public String getSessionUUID() { + return sessionUUID; + } + + public String getNodeId() { + return nodeId; + } + + public Store.MetadataSnapshot getMetaData() { + return metaData; + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java new file mode 100644 index 0000000000000..78cbf6ad8c636 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.single.shard.SingleShardRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +public class PutCcrRestoreSessionRequest extends SingleShardRequest { + + private ShardId shardId; + + public PutCcrRestoreSessionRequest() { + } + + public PutCcrRestoreSessionRequest(ShardId shardId) { + super(shardId.getIndexName()); + this.shardId = shardId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + } + + public ShardId getShardId() { + return shardId; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java new file mode 100644 index 0000000000000..4f26e7105cf79 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -0,0 +1,59 @@ +package org.elasticsearch.xpack.ccr.repository; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.engine.Engine; + +import java.io.IOException; +import java.util.Map; + +public class CcrRestoreSourceService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); + + final private Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + + public CcrRestoreSourceService(Settings settings) { + super(settings); + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() throws IOException { + IOUtils.closeWhileHandlingException(onGoingRestores.values()); + } + + public void addCommit(String sessionUUID, Engine.IndexCommitRef commit) { + onGoingRestores.put(sessionUUID, commit); + } + + public Engine.IndexCommitRef getCommit(String sessionUUID) { + Engine.IndexCommitRef commit = onGoingRestores.get(sessionUUID); + if (commit == null) { + throw new ElasticsearchException("commit for [" + sessionUUID + "] not found"); + } + return commit; + } + + public void closeCommit(String sessionUUID) { + Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); + if (commit == null) { + throw new ElasticsearchException("commit for [" + sessionUUID + "] not found"); + } + IOUtils.closeWhileHandlingException(commit); + } +} From 2479f94d0dce11e887c48f430dd014b45d161f8a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 12 Dec 2018 12:46:07 -0700 Subject: [PATCH 02/14] Change file relationship --- .../common/util/iterable/Iterables.java | 2 +- .../PutCcrRestoreSessionAction.java | 39 +-- .../PutCcrRestoreSessionRequest.java | 31 ++- .../xpack/ccr/repository/CcrRepository.java | 222 +++++++++++++++++- 4 files changed, 261 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java b/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java index 2852e33eb4307..3783de95585cd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java +++ b/server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java @@ -34,7 +34,7 @@ public Iterables() { public static Iterable concat(Iterable... inputs) { Objects.requireNonNull(inputs); - return new ConcatenatedIterable(inputs); + return new ConcatenatedIterable<>(inputs); } static class ConcatenatedIterable implements Iterable { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index f2ed0e916cc82..1450c3489e533 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -25,12 +25,15 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; public class PutCcrRestoreSessionAction extends Action { @@ -83,7 +86,7 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques } finally { indexShard.store().decRef(); } - return new PutCcrRestoreSessionResponse(sessionUUID, indexShard.routingEntry().currentNodeId(), snapshot); + return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId(), new ArrayList<>(), new ArrayList<>()); } @Override @@ -108,52 +111,52 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { public static class PutCcrRestoreSessionResponse extends ActionResponse { - private String sessionUUID; private String nodeId; - private Store.MetadataSnapshot metaData; + private List identicalFiles; + private List filesToRecover; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(String sessionUUID, String nodeId, Store.MetadataSnapshot metaData) { - this.sessionUUID = sessionUUID; + PutCcrRestoreSessionResponse(String nodeId, List identicalFiles, List filesToRecover) { this.nodeId = nodeId; - this.metaData = metaData; + this.identicalFiles = identicalFiles; + this.filesToRecover = filesToRecover; } PutCcrRestoreSessionResponse(StreamInput streamInput) throws IOException { super(streamInput); - sessionUUID = streamInput.readString(); nodeId = streamInput.readString(); - metaData = new Store.MetadataSnapshot(streamInput); + identicalFiles = streamInput.readList(StoreFileMetaData::new); + filesToRecover = streamInput.readList(StoreFileMetaData::new); } @Override public void readFrom(StreamInput streamInput) throws IOException { super.readFrom(streamInput); - sessionUUID = streamInput.readString(); nodeId = streamInput.readString(); - metaData = new Store.MetadataSnapshot(streamInput); + identicalFiles = streamInput.readList(StoreFileMetaData::new); + filesToRecover = streamInput.readList(StoreFileMetaData::new); } @Override public void writeTo(StreamOutput streamOutput) throws IOException { super.writeTo(streamOutput); - streamOutput.writeString(sessionUUID); streamOutput.writeString(nodeId); - metaData.writeTo(streamOutput); - } - - public String getSessionUUID() { - return sessionUUID; + streamOutput.writeList(identicalFiles); + streamOutput.writeList(filesToRecover); } public String getNodeId() { return nodeId; } - public Store.MetadataSnapshot getMetaData() { - return metaData; + public List getIdenticalFiles() { + return identicalFiles; + } + + public List getFilesToRecover() { + return filesToRecover; } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java index 78cbf6ad8c636..dee0b9621335d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -11,19 +11,24 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import java.io.IOException; public class PutCcrRestoreSessionRequest extends SingleShardRequest { + private String sessionUUID; private ShardId shardId; + private Store.MetadataSnapshot metaData; public PutCcrRestoreSessionRequest() { } - public PutCcrRestoreSessionRequest(ShardId shardId) { + public PutCcrRestoreSessionRequest(String sessionUUID, ShardId shardId, Store.MetadataSnapshot metaData) { super(shardId.getIndexName()); + this.sessionUUID = sessionUUID; this.shardId = shardId; + this.metaData = metaData; } @Override @@ -32,18 +37,30 @@ public ActionRequestValidationException validate() { } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardId = ShardId.readShardId(in); + public void readFrom(StreamInput streamInput) throws IOException { + super.readFrom(streamInput); + sessionUUID = streamInput.readString(); + shardId = ShardId.readShardId(streamInput); + metaData = new Store.MetadataSnapshot(streamInput); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - shardId.writeTo(out); + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); + streamOutput.writeString(sessionUUID); + shardId.writeTo(streamOutput); + metaData.writeTo(streamOutput); + } + + public String getSessionUUID() { + return sessionUUID; } public ShardId getShardId() { return shardId; } + + public Store.MetadataSnapshot getMetaData() { + return metaData; + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index cc863c069899a..b4a57fb7583c2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -6,7 +6,11 @@ package org.elasticsearch.xpack.ccr.repository; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -16,16 +20,21 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -36,9 +45,14 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionRequest; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -51,6 +65,8 @@ */ public class CcrRepository extends AbstractLifecycleComponent implements Repository { + private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); + public static final String LATEST = "_latest_"; public static final String TYPE = "_ccr_"; public static final String NAME_PREFIX = "_ccr_"; @@ -81,7 +97,7 @@ protected void doStop() { } @Override - protected void doClose() throws IOException { + protected void doClose() { } @@ -227,14 +243,35 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, @Override public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, RecoveryState recoveryState) { - final Store store = indexShard.store(); - store.incRef(); + Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + String sessionUUID = UUIDs.randomBase64UUID(); + Store.MetadataSnapshot recoveryTargetMetadata; + try { + // this will throw an IOException if the store has no segments infos file. The + // store can still have existing files but they will be deleted just before being + // restored. + recoveryTargetMetadata = indexShard.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // happens when restore to an empty shard, not a big deal + logger.trace("[{}] [{}] restoring from to an empty shard", shardId, SNAPSHOT_ID); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", shardId, e); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } + + + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryTargetMetadata)).actionGet(); + String nodeId = response.getNodeId(); + try { - store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion); - } catch (EngineException | IOException e) { + RestoreSession restoreSession = new RestoreSession(remoteClient, null, nodeId, indexShard, recoveryState, null); + restoreSession.restore(); + } catch (Exception e) { throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); } finally { - store.decRef(); + remoteClient.execute(DeleteCcrRestoreSessionAction.INSTANCE, new DeleteCcrRestoreSessionRequest()).actionGet(); } } @@ -242,4 +279,175 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + + private static class RestoreSession { + + private final int BUFFER_SIZE = 1 << 16; + + private final Client remoteClient; + private final String sessionUUID; + private final String nodeId; + private final IndexShard indexShard; + private final RecoveryState recoveryState; + private final Store.MetadataSnapshot sourceMetaData; + + RestoreSession(Client remoteClient, String sessionUUID, String nodeId, IndexShard indexShard, RecoveryState recoveryState, + Store.MetadataSnapshot sourceMetaData) { + this.remoteClient = remoteClient; + this.sessionUUID = sessionUUID; + this.nodeId = nodeId; + this.indexShard = indexShard; + this.recoveryState = recoveryState; + this.sourceMetaData = sourceMetaData; + } + + void restore() throws IOException { + final Store store = indexShard.store(); + store.incRef(); + try { + final ShardId shardId = indexShard.shardId(); + logger.debug("[{}] repository restoring shard [{}]", CcrRepository.TYPE, shardId); + + Store.MetadataSnapshot recoveryTargetMetadata; + try { + // this will throw an IOException if the store has no segments infos file. The + // store can still have existing files but they will be deleted just before being + // restored. + recoveryTargetMetadata = indexShard.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // happens when restore to an empty shard, not a big deal + logger.trace("[{}] [{}] restoring from to an empty shard", shardId, SNAPSHOT_ID); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", shardId, e); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } + + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (restoredSegmentsFile == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + + final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); + for (StoreFileMetaData fileMetaData : diff.identical) { + recoveryState.getIndex().addFileDetail(fileMetaData.name(), fileMetaData.length(), true); + logger.trace("[{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, fileMetaData.name()); + } + + List filesToRecover = new ArrayList<>(); + for (StoreFileMetaData fileMetaData : Iterables.concat(diff.different, diff.missing)) { + filesToRecover.add(fileMetaData); + recoveryState.getIndex().addFileDetail(fileMetaData.name(), fileMetaData.length(), false); + logger.trace("[{}] recovering [{}], exists in local store but is different", shardId, fileMetaData.name()); + } + + if (filesToRecover.isEmpty()) { + logger.trace("no files to recover, all exists within the local store"); + } + + // list of all existing store files + final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); + + // restore the files from the snapshot to the Lucene store + for (final StoreFileMetaData fileToRecover : filesToRecover) { + // if a file with a same physical name already exist in the store we need to delete it + // before restoring it from the snapshot. We could be lenient and try to reuse the existing + // store files (and compare their names/length/checksum again with the snapshot files) but to + // avoid extra complexity we simply delete them and restore them again like StoreRecovery + // does with dangling indices. Any existing store file that is not restored from the snapshot + // will be clean up by RecoveryTarget.cleanFiles(). + final String name = fileToRecover.name(); + if (deleteIfExistFiles.contains(name)) { + logger.trace("[{}] deleting pre-existing file [{}]", shardId, name); + store.directory().deleteFile(name); + } + + logger.trace("[{}] restoring file [{}]", shardId, fileToRecover.name()); + restoreFile(fileToRecover, store); + } + + // read the snapshot data persisted + final SegmentInfos segmentCommitInfos; + try { + segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } + recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + // now, go over and clean files that are in the store, but were not in the snapshot + store.cleanupAndVerify("restore complete from remote", sourceMetaData); + } finally { + store.decRef(); + } + } + + private void restoreFile(StoreFileMetaData fileToRecover, Store store) throws IOException { +// boolean success = false; +// +// try (InputStream stream = new RestoreFileInputStream(remoteClient, sessionUUID, nodeId, fileToRecover)) { +// try (IndexOutput indexOutput = store.createVerifyingOutput(fileToRecover.name(), fileToRecover, IOContext.DEFAULT)) { +// final byte[] buffer = new byte[BUFFER_SIZE]; +// int length; +// while ((length = stream.read(buffer)) > 0) { +// indexOutput.writeBytes(buffer, 0, length); +// recoveryState.getIndex().addRecoveredBytesToFile(fileToRecover.name(), length); +// } +// Store.verify(indexOutput); +// indexOutput.close(); +// store.directory().sync(Collections.singleton(fileToRecover.name())); +// success = true; +// } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { +// try { +// store.markStoreCorrupted(ex); +// } catch (IOException e) { +// logger.warn("store cannot be marked as corrupted", e); +// } +// throw ex; +// } finally { +// if (success == false) { +// store.deleteQuiet(fileToRecover.name()); +// } +// } +// } + } + } + +// private static class RestoreFileInputStream extends InputStream { +// +// private final Client remoteClient; +// private final String sessionUUID; +// private final String nodeId; +// private final StoreFileMetaData fileInfo; +// +// private long pos = 0; +// +// private RestoreFileInputStream(Client remoteClient, String sessionUUID, String nodeId, StoreFileMetaData fileInfo) { +// this.remoteClient = remoteClient; +// this.sessionUUID = sessionUUID; +// this.nodeId = nodeId; +// this.fileInfo = fileInfo; +// } +// +// +// @Override +// public int read() throws IOException { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public int read(byte[] bytes, int off, int len) throws IOException { +// if (pos >= fileInfo.length()) { +// return 0; +// } +// +// GetCcrRestoreFileChunkRequest request = GetCcrRestoreFileChunkAction.createRequest(nodeId, sessionUUID, +// fileInfo.name(), pos, (int) Math.min(fileInfo.length() - pos, len)); +// byte[] fileChunk = remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet().getChunk(); +// +// pos += fileChunk.length; +// System.arraycopy(fileChunk, 0, bytes, off, fileChunk.length); +// return fileChunk.length; +// } +// } } From 9fac474b22c215de81f0f325c579587f0473897a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 12 Dec 2018 15:34:57 -0700 Subject: [PATCH 03/14] WIP --- .../PutCcrRestoreSessionAction.java | 20 +++--- .../repository/CcrRestoreSourceService.java | 61 +++++++++++++++---- .../CcrRestoreSourceServiceTests.java | 24 ++++++++ 3 files changed, 79 insertions(+), 26 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 1450c3489e533..9d4fc16eb29a2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -15,12 +15,11 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -76,17 +75,12 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques if (indexShard == null) { throw new ShardNotFoundException(shardId); } - Engine.IndexCommitRef commit = indexShard.acquireSafeIndexCommit(); - String sessionUUID = UUIDs.randomBase64UUID(); - ccrRestoreService.addCommit(sessionUUID, commit); - final Store.MetadataSnapshot snapshot; - indexShard.store().incRef(); - try { - snapshot = indexShard.store().getMetadata(commit.getIndexCommit()); - } finally { - indexShard.store().decRef(); - } - return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId(), new ArrayList<>(), new ArrayList<>()); + Store.MetadataSnapshot sourceMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard); + Store.RecoveryDiff recoveryDiff = sourceMetaData.recoveryDiff(request.getMetaData()); + + ArrayList filesToRecover = new ArrayList<>(recoveryDiff.different); + filesToRecover.addAll(recoveryDiff.missing); + return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId(), recoveryDiff.identical, filesToRecover); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 4f26e7105cf79..0f76d1d1c282d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -1,27 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + package org.elasticsearch.xpack.ccr.repository; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; -public class CcrRestoreSourceService extends AbstractLifecycleComponent { +public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener { private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); - final private Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + private final AtomicBoolean isOpen = new AtomicBoolean(true); + private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + private final Map> sessionsForShard = new HashMap<>(); public CcrRestoreSourceService(Settings settings) { super(settings); } + @Override + public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + HashSet sessions = sessionsForShard.remove(indexShard); + if (sessions != null) { + for (String sessionUUID : sessions) { + Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); + IOUtils.closeWhileHandlingException(commit); + } + } + } + } + @Override protected void doStart() { @@ -34,26 +63,32 @@ protected void doStop() { @Override protected void doClose() throws IOException { - IOUtils.closeWhileHandlingException(onGoingRestores.values()); + if (isOpen.compareAndSet(true, false)) { + IOUtils.closeWhileHandlingException(onGoingRestores.values()); + } } - public void addCommit(String sessionUUID, Engine.IndexCommitRef commit) { + public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { + Engine.IndexCommitRef commit = indexShard.acquireSafeIndexCommit(); onGoingRestores.put(sessionUUID, commit); - } - - public Engine.IndexCommitRef getCommit(String sessionUUID) { - Engine.IndexCommitRef commit = onGoingRestores.get(sessionUUID); - if (commit == null) { - throw new ElasticsearchException("commit for [" + sessionUUID + "] not found"); + indexShard.store().incRef(); + try { + return indexShard.store().getMetadata(commit.getIndexCommit()); + } finally { + indexShard.store().decRef(); } - return commit; } - public void closeCommit(String sessionUUID) { - Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); + public synchronized void closeSession(String sessionUUID, IndexShard indexShard) { + Engine.IndexCommitRef commit = onGoingRestores.get(sessionUUID); if (commit == null) { throw new ElasticsearchException("commit for [" + sessionUUID + "] not found"); } IOUtils.closeWhileHandlingException(commit); + HashSet sessions = sessionsForShard.get(indexShard); + sessions.remove(sessionUUID); + if (sessions.isEmpty()) { + sessionsForShard.remove(indexShard); + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java new file mode 100644 index 0000000000000..21a2b17fda1f2 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.elasticsearch.test.ESTestCase; + +public class CcrRestoreSourceServiceTests extends ESTestCase { + + public void testOpenSession() { + + } + + public void testCloseSession() { + + } + + public void testCloseShardListenerFunctionality() { + + } +} From 7b79a5e1802ac20b4c4482dfdbe601893d4f0f32 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 12 Dec 2018 17:19:40 -0700 Subject: [PATCH 04/14] WIP --- .../DeleteCcrRestoreSessionAction.java | 12 +- .../DeleteCcrRestoreSessionRequest.java | 29 ++- .../xpack/ccr/repository/CcrRepository.java | 217 ++---------------- .../repository/CcrRestoreSourceService.java | 32 ++- .../CcrRestoreSourceServiceTests.java | 104 ++++++++- 5 files changed, 176 insertions(+), 218 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java index 6413889cd22ad..c7ba66babee20 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; @@ -60,7 +61,11 @@ public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterServ @Override protected DeleteCcrRestoreSessionResponse shardOperation(DeleteCcrRestoreSessionRequest request, ShardId shardId) throws IOException { - // TODO: Implement + IndexShard indexShard = indicesService.getShardOrNull(shardId); + if (indexShard == null) { + throw new ShardNotFoundException(shardId); + } + ccrRestoreService.closeSession(request.getSessionUUID(), indexShard); return new DeleteCcrRestoreSessionResponse(); } @@ -76,7 +81,10 @@ protected boolean resolveIndex(DeleteCcrRestoreSessionRequest request) { @Override protected ShardsIterator shards(ClusterState state, InternalRequest request) { - return null; + final ShardId shardId = request.request().getShardId(); + // The index uuid is not correct if we restore with a rename + IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId.getIndexName(), shardId.id()); + return shardRoutingTable.primaryShardIt(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java index d80099912a990..caf91df9c1fb0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java @@ -16,11 +16,16 @@ public class DeleteCcrRestoreSessionRequest extends SingleShardRequest { - public DeleteCcrRestoreSessionRequest() { + private String sessionUUID; + private ShardId shardId; + + DeleteCcrRestoreSessionRequest() { } - public DeleteCcrRestoreSessionRequest(ShardId shardId) { + public DeleteCcrRestoreSessionRequest(String sessionUUID, ShardId shardId) { super(shardId.getIndexName()); + this.sessionUUID = sessionUUID; + this.shardId = shardId; } @Override @@ -29,12 +34,24 @@ public ActionRequestValidationException validate() { } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public void readFrom(StreamInput streamInput) throws IOException { + super.readFrom(streamInput); + sessionUUID = streamInput.readString(); + shardId = ShardId.readShardId(streamInput); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); + streamOutput.writeString(sessionUUID); + shardId.writeTo(streamOutput); + } + + public String getSessionUUID() { + return sessionUUID; + } + + public ShardId getShardId() { + return shardId; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index b4a57fb7583c2..a3b731d078bec 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -8,9 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -23,18 +21,15 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -52,7 +47,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -243,35 +237,37 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, @Override public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, RecoveryState recoveryState) { - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - String sessionUUID = UUIDs.randomBase64UUID(); - Store.MetadataSnapshot recoveryTargetMetadata; + final Store store = indexShard.store(); + store.incRef(); try { - // this will throw an IOException if the store has no segments infos file. The - // store can still have existing files but they will be deleted just before being - // restored. - recoveryTargetMetadata = indexShard.snapshotStoreMetadata(); - } catch (IndexNotFoundException e) { - // happens when restore to an empty shard, not a big deal - logger.trace("[{}] [{}] restoring from to an empty shard", shardId, SNAPSHOT_ID); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } catch (IOException e) { - logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", shardId, e); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion); + } catch (EngineException | IOException e) { + throw new IndexShardRecoveryException(shardId, "failed to create empty store", e); + } finally { + store.decRef(); } + Store.MetadataSnapshot recoveryMetadata; + try { + recoveryMetadata = indexShard.snapshotStoreMetadata(); + } catch (IOException e) { + throw new IndexShardRecoveryException(shardId, "failed access store metadata", e); + } + Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryTargetMetadata)).actionGet(); + new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryMetadata)).actionGet(); + // The nodeId is necessary to route file chunk requests to appropriate node String nodeId = response.getNodeId(); try { - RestoreSession restoreSession = new RestoreSession(remoteClient, null, nodeId, indexShard, recoveryState, null); - restoreSession.restore(); + // Implement file restore } catch (Exception e) { throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); } finally { - remoteClient.execute(DeleteCcrRestoreSessionAction.INSTANCE, new DeleteCcrRestoreSessionRequest()).actionGet(); + DeleteCcrRestoreSessionRequest deleteRequest = new DeleteCcrRestoreSessionRequest(sessionUUID, shardId); + remoteClient.execute(DeleteCcrRestoreSessionAction.INSTANCE, deleteRequest).actionGet(); } } @@ -279,175 +275,4 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - - private static class RestoreSession { - - private final int BUFFER_SIZE = 1 << 16; - - private final Client remoteClient; - private final String sessionUUID; - private final String nodeId; - private final IndexShard indexShard; - private final RecoveryState recoveryState; - private final Store.MetadataSnapshot sourceMetaData; - - RestoreSession(Client remoteClient, String sessionUUID, String nodeId, IndexShard indexShard, RecoveryState recoveryState, - Store.MetadataSnapshot sourceMetaData) { - this.remoteClient = remoteClient; - this.sessionUUID = sessionUUID; - this.nodeId = nodeId; - this.indexShard = indexShard; - this.recoveryState = recoveryState; - this.sourceMetaData = sourceMetaData; - } - - void restore() throws IOException { - final Store store = indexShard.store(); - store.incRef(); - try { - final ShardId shardId = indexShard.shardId(); - logger.debug("[{}] repository restoring shard [{}]", CcrRepository.TYPE, shardId); - - Store.MetadataSnapshot recoveryTargetMetadata; - try { - // this will throw an IOException if the store has no segments infos file. The - // store can still have existing files but they will be deleted just before being - // restored. - recoveryTargetMetadata = indexShard.snapshotStoreMetadata(); - } catch (IndexNotFoundException e) { - // happens when restore to an empty shard, not a big deal - logger.trace("[{}] [{}] restoring from to an empty shard", shardId, SNAPSHOT_ID); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } catch (IOException e) { - logger.warn("{} Can't read metadata from store, will not reuse any local file while restoring", shardId, e); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } - - final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); - if (restoredSegmentsFile == null) { - throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); - } - - final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); - for (StoreFileMetaData fileMetaData : diff.identical) { - recoveryState.getIndex().addFileDetail(fileMetaData.name(), fileMetaData.length(), true); - logger.trace("[{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, fileMetaData.name()); - } - - List filesToRecover = new ArrayList<>(); - for (StoreFileMetaData fileMetaData : Iterables.concat(diff.different, diff.missing)) { - filesToRecover.add(fileMetaData); - recoveryState.getIndex().addFileDetail(fileMetaData.name(), fileMetaData.length(), false); - logger.trace("[{}] recovering [{}], exists in local store but is different", shardId, fileMetaData.name()); - } - - if (filesToRecover.isEmpty()) { - logger.trace("no files to recover, all exists within the local store"); - } - - // list of all existing store files - final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); - - // restore the files from the snapshot to the Lucene store - for (final StoreFileMetaData fileToRecover : filesToRecover) { - // if a file with a same physical name already exist in the store we need to delete it - // before restoring it from the snapshot. We could be lenient and try to reuse the existing - // store files (and compare their names/length/checksum again with the snapshot files) but to - // avoid extra complexity we simply delete them and restore them again like StoreRecovery - // does with dangling indices. Any existing store file that is not restored from the snapshot - // will be clean up by RecoveryTarget.cleanFiles(). - final String name = fileToRecover.name(); - if (deleteIfExistFiles.contains(name)) { - logger.trace("[{}] deleting pre-existing file [{}]", shardId, name); - store.directory().deleteFile(name); - } - - logger.trace("[{}] restoring file [{}]", shardId, fileToRecover.name()); - restoreFile(fileToRecover, store); - } - - // read the snapshot data persisted - final SegmentInfos segmentCommitInfos; - try { - segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } - recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); - - // now, go over and clean files that are in the store, but were not in the snapshot - store.cleanupAndVerify("restore complete from remote", sourceMetaData); - } finally { - store.decRef(); - } - } - - private void restoreFile(StoreFileMetaData fileToRecover, Store store) throws IOException { -// boolean success = false; -// -// try (InputStream stream = new RestoreFileInputStream(remoteClient, sessionUUID, nodeId, fileToRecover)) { -// try (IndexOutput indexOutput = store.createVerifyingOutput(fileToRecover.name(), fileToRecover, IOContext.DEFAULT)) { -// final byte[] buffer = new byte[BUFFER_SIZE]; -// int length; -// while ((length = stream.read(buffer)) > 0) { -// indexOutput.writeBytes(buffer, 0, length); -// recoveryState.getIndex().addRecoveredBytesToFile(fileToRecover.name(), length); -// } -// Store.verify(indexOutput); -// indexOutput.close(); -// store.directory().sync(Collections.singleton(fileToRecover.name())); -// success = true; -// } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { -// try { -// store.markStoreCorrupted(ex); -// } catch (IOException e) { -// logger.warn("store cannot be marked as corrupted", e); -// } -// throw ex; -// } finally { -// if (success == false) { -// store.deleteQuiet(fileToRecover.name()); -// } -// } -// } - } - } - -// private static class RestoreFileInputStream extends InputStream { -// -// private final Client remoteClient; -// private final String sessionUUID; -// private final String nodeId; -// private final StoreFileMetaData fileInfo; -// -// private long pos = 0; -// -// private RestoreFileInputStream(Client remoteClient, String sessionUUID, String nodeId, StoreFileMetaData fileInfo) { -// this.remoteClient = remoteClient; -// this.sessionUUID = sessionUUID; -// this.nodeId = nodeId; -// this.fileInfo = fileInfo; -// } -// -// -// @Override -// public int read() throws IOException { -// throw new UnsupportedOperationException(); -// } -// -// @Override -// public int read(byte[] bytes, int off, int len) throws IOException { -// if (pos >= fileInfo.length()) { -// return 0; -// } -// -// GetCcrRestoreFileChunkRequest request = GetCcrRestoreFileChunkAction.createRequest(nodeId, sessionUUID, -// fileInfo.name(), pos, (int) Math.min(fileInfo.length() - pos, len)); -// byte[] fileChunk = remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet().getChunk(); -// -// pos += fileChunk.length; -// System.arraycopy(fileChunk, 0, bytes, off, fileChunk.length); -// return fileChunk.length; -// } -// } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 0f76d1d1c282d..48abc9d8c07db 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -6,8 +6,8 @@ package org.elasticsearch.xpack.ccr.repository; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -24,13 +24,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener { private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); - private final AtomicBoolean isOpen = new AtomicBoolean(true); private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); @@ -38,12 +36,15 @@ public CcrRestoreSourceService(Settings settings) { super(settings); } + // TODO: Need to register with IndicesService @Override public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null) { + logger.debug("shard [{}] closing, closing sessions", indexShard); HashSet sessions = sessionsForShard.remove(indexShard); if (sessions != null) { for (String sessionUUID : sessions) { + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); IOUtils.closeWhileHandlingException(commit); } @@ -63,12 +64,23 @@ protected void doStop() { @Override protected void doClose() throws IOException { - if (isOpen.compareAndSet(true, false)) { - IOUtils.closeWhileHandlingException(onGoingRestores.values()); - } + IOUtils.closeWhileHandlingException(onGoingRestores.values()); + } + + // default visibility for testing + synchronized HashSet getSessionsForShard(IndexShard indexShard) { + return sessionsForShard.get(indexShard); + } + + // default visibility for testing + synchronized Engine.IndexCommitRef getIndexCommit(String sessionUUID) { + return onGoingRestores.get(sessionUUID); } public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { + logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); + HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); + sessions.add(sessionUUID); Engine.IndexCommitRef commit = indexShard.acquireSafeIndexCommit(); onGoingRestores.put(sessionUUID, commit); indexShard.store().incRef(); @@ -80,9 +92,11 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index } public synchronized void closeSession(String sessionUUID, IndexShard indexShard) { - Engine.IndexCommitRef commit = onGoingRestores.get(sessionUUID); + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); + Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); if (commit == null) { - throw new ElasticsearchException("commit for [" + sessionUUID + "] not found"); + logger.info("could not close session [{}] for shard [{}] because session not found", sessionUUID, indexShard); + throw new ElasticsearchException("session [" + sessionUUID + "] not found"); } IOUtils.closeWhileHandlingException(commit); HashSet sessions = sessionsForShard.get(indexShard); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 21a2b17fda1f2..7362fbfe33442 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -6,19 +6,113 @@ package org.elasticsearch.xpack.ccr.repository; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.junit.Before; -public class CcrRestoreSourceServiceTests extends ESTestCase { +import java.io.IOException; +import java.util.HashSet; - public void testOpenSession() { +public class CcrRestoreSourceServiceTests extends IndexShardTestCase { + private CcrRestoreSourceService restoreSourceService; + + @Before + public void setUp() throws Exception { + super.setUp(); + restoreSourceService = new CcrRestoreSourceService(Settings.EMPTY); } - public void testCloseSession() { + public void testOpenSession() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + IndexShard indexShard2 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final String sessionUUID3 = UUIDs.randomBase64UUID(); + + assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + + assertNotNull(restoreSourceService.openSession(sessionUUID1, indexShard1)); + HashSet sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); + assertEquals(1, sessionsForShard.size()); + assertTrue(sessionsForShard.contains(sessionUUID1)); + assertNotNull(restoreSourceService.openSession(sessionUUID2, indexShard1)); + sessionsForShard = restoreSourceService.getSessionsForShard(indexShard1); + assertEquals(2, sessionsForShard.size()); + assertTrue(sessionsForShard.contains(sessionUUID2)); + + assertNull(restoreSourceService.getSessionsForShard(indexShard2)); + assertNotNull(restoreSourceService.openSession(sessionUUID3, indexShard2)); + sessionsForShard = restoreSourceService.getSessionsForShard(indexShard2); + assertEquals(1, sessionsForShard.size()); + assertTrue(sessionsForShard.contains(sessionUUID3)); + + restoreSourceService.closeSession(sessionUUID1, indexShard1); + restoreSourceService.closeSession(sessionUUID2, indexShard1); + restoreSourceService.closeSession(sessionUUID3, indexShard2); + + closeShards(indexShard1, indexShard2); } - public void testCloseShardListenerFunctionality() { + public void testCloseSession() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + IndexShard indexShard2 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final String sessionUUID3 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard1); + restoreSourceService.openSession(sessionUUID2, indexShard1); + restoreSourceService.openSession(sessionUUID3, indexShard2); + + assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); + assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); + assertNotNull(restoreSourceService.getIndexCommit(sessionUUID1)); + assertNotNull(restoreSourceService.getIndexCommit(sessionUUID2)); + assertNotNull(restoreSourceService.getIndexCommit(sessionUUID3)); + + restoreSourceService.closeSession(sessionUUID1, indexShard1); + assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size()); + assertNull(restoreSourceService.getIndexCommit(sessionUUID1)); + assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1)); + assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2)); + + restoreSourceService.closeSession(sessionUUID2, indexShard1); + assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + assertNull(restoreSourceService.getIndexCommit(sessionUUID2)); + + restoreSourceService.closeSession(sessionUUID3, indexShard2); + assertNull(restoreSourceService.getSessionsForShard(indexShard2)); + assertNull(restoreSourceService.getIndexCommit(sessionUUID3)); + + closeShards(indexShard1, indexShard2); + } + + public void testCloseShardListenerFunctionality() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + IndexShard indexShard2 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + final String sessionUUID2 = UUIDs.randomBase64UUID(); + final String sessionUUID3 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard1); + restoreSourceService.openSession(sessionUUID2, indexShard1); + restoreSourceService.openSession(sessionUUID3, indexShard2); + + assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); + assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); + + restoreSourceService.beforeIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); + + assertNull(restoreSourceService.getSessionsForShard(indexShard1)); + assertNull(restoreSourceService.getIndexCommit(sessionUUID1)); + assertNull(restoreSourceService.getIndexCommit(sessionUUID2)); + + restoreSourceService.closeSession(sessionUUID3, indexShard2); + closeShards(indexShard1, indexShard2); } } From fe3ef848e64deefbcc06cd862b2998e89fa8fe38 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 12 Dec 2018 17:46:37 -0700 Subject: [PATCH 05/14] WIP --- .../DeleteCcrRestoreSessionAction.java | 9 +--- .../PutCcrRestoreSessionAction.java | 1 - .../PutCcrRestoreSessionRequest.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 5 --- .../repository/CcrRestoreSourceService.java | 34 +++++++++++--- .../xpack/ccr/CcrRepositoryIT.java | 44 +++++++++++++++++++ 6 files changed, 76 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java index c7ba66babee20..f8696f3fea24a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java @@ -15,26 +15,21 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; -import java.io.IOException; - public class DeleteCcrRestoreSessionAction extends Action { public static final DeleteCcrRestoreSessionAction INSTANCE = new DeleteCcrRestoreSessionAction(); private static final String NAME = "internal:admin/ccr/restore/session/delete"; - protected DeleteCcrRestoreSessionAction() { + private DeleteCcrRestoreSessionAction() { super(NAME); } @@ -60,7 +55,7 @@ public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterServ } @Override - protected DeleteCcrRestoreSessionResponse shardOperation(DeleteCcrRestoreSessionRequest request, ShardId shardId) throws IOException { + protected DeleteCcrRestoreSessionResponse shardOperation(DeleteCcrRestoreSessionRequest request, ShardId shardId) { IndexShard indexShard = indicesService.getShardOrNull(shardId); if (indexShard == null) { throw new ShardNotFoundException(shardId); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 9d4fc16eb29a2..88fb1fefc2eed 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java index dee0b9621335d..e9ad0181c4cb5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -21,7 +21,7 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); + private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); public CcrRestoreSourceService(Settings settings) { super(settings); @@ -63,8 +67,20 @@ protected void doStop() { } @Override - protected void doClose() throws IOException { + protected synchronized void doClose() throws IOException { + sessionsForShard.clear(); IOUtils.closeWhileHandlingException(onGoingRestores.values()); + onGoingRestores.clear(); + } + + // TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested, + // these should be removed. + public void addOpenSessionListener(Consumer listener) { + openSessionListeners.add(listener); + } + + public void addCloseSessionListener(Consumer listener) { + closeSessionListeners.add(listener); } // default visibility for testing @@ -79,10 +95,17 @@ synchronized Engine.IndexCommitRef getIndexCommit(String sessionUUID) { public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); - HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); - sessions.add(sessionUUID); - Engine.IndexCommitRef commit = indexShard.acquireSafeIndexCommit(); - onGoingRestores.put(sessionUUID, commit); + Engine.IndexCommitRef commit; + if (onGoingRestores.containsKey(sessionUUID)) { + logger.debug("session [{}] already exists", sessionUUID); + commit = onGoingRestores.get(sessionUUID); + } else { + commit = indexShard.acquireSafeIndexCommit(); + onGoingRestores.put(sessionUUID, commit); + openSessionListeners.forEach(c -> c.accept(sessionUUID)); + HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); + sessions.add(sessionUUID); + } indexShard.store().incRef(); try { return indexShard.store().getMetadata(commit.getIndexCommit()); @@ -93,6 +116,7 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index public synchronized void closeSession(String sessionUUID, IndexShard indexShard) { logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); + closeSessionListeners.forEach(c -> c.accept(sessionUUID)); Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); if (commit == null) { logger.info("could not close session [{}] for shard [{}] because session not found", sessionUUID, indexShard); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index aff5a2862e17a..3b38035c6e9af 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -31,9 +32,11 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonMap; @@ -151,6 +154,47 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID()); } + public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + Set sessionsOpened = ConcurrentCollections.newConcurrentSet(); + Set sessionsClosed = ConcurrentCollections.newConcurrentSet(); + for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { + restoreSourceService.addOpenSessionListener(sessionsOpened::add); + restoreSourceService.addCloseSessionListener(sessionsClosed::add); + } + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(numberOfPrimaryShards, sessionsOpened.size()); + assertEquals(numberOfPrimaryShards, sessionsClosed.size()); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() { From e85bfa14453337194b9043ed240cdc8bc09531be Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 10:20:57 -0700 Subject: [PATCH 06/14] Changes --- .../DeleteCcrRestoreSessionAction.java | 5 +- .../PutCcrRestoreSessionAction.java | 37 ++------------- .../xpack/ccr/repository/CcrRepository.java | 36 ++++++++++---- .../repository/CcrRestoreSourceService.java | 47 +++++++++++++------ .../CcrRestoreSourceServiceTests.java | 2 +- 5 files changed, 65 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java index f8696f3fea24a..877280ff53852 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -77,9 +76,7 @@ protected boolean resolveIndex(DeleteCcrRestoreSessionRequest request) { @Override protected ShardsIterator shards(ClusterState state, InternalRequest request) { final ShardId shardId = request.request().getShardId(); - // The index uuid is not correct if we restore with a rename - IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId.getIndexName(), shardId.id()); - return shardRoutingTable.primaryShardIt(); + return state.routingTable().shardRoutingTable(shardId).primaryShardIt(); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 88fb1fefc2eed..573f3eeee07da 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -22,16 +21,12 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; public class PutCcrRestoreSessionAction extends Action { @@ -74,12 +69,8 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques if (indexShard == null) { throw new ShardNotFoundException(shardId); } - Store.MetadataSnapshot sourceMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard); - Store.RecoveryDiff recoveryDiff = sourceMetaData.recoveryDiff(request.getMetaData()); - - ArrayList filesToRecover = new ArrayList<>(recoveryDiff.different); - filesToRecover.addAll(recoveryDiff.missing); - return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId(), recoveryDiff.identical, filesToRecover); + ccrRestoreService.openSession(request.getSessionUUID(), indexShard); + return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId()); } @Override @@ -95,9 +86,7 @@ protected boolean resolveIndex(PutCcrRestoreSessionRequest request) { @Override protected ShardsIterator shards(ClusterState state, InternalRequest request) { final ShardId shardId = request.request().getShardId(); - // The index uuid is not correct if we restore with a rename - IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId.getIndexName(), shardId.id()); - return shardRoutingTable.primaryShardIt(); + return state.routingTable().shardRoutingTable(shardId).primaryShardIt(); } } @@ -105,51 +94,33 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { public static class PutCcrRestoreSessionResponse extends ActionResponse { private String nodeId; - private List identicalFiles; - private List filesToRecover; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(String nodeId, List identicalFiles, List filesToRecover) { + PutCcrRestoreSessionResponse(String nodeId) { this.nodeId = nodeId; - this.identicalFiles = identicalFiles; - this.filesToRecover = filesToRecover; } PutCcrRestoreSessionResponse(StreamInput streamInput) throws IOException { super(streamInput); nodeId = streamInput.readString(); - identicalFiles = streamInput.readList(StoreFileMetaData::new); - filesToRecover = streamInput.readList(StoreFileMetaData::new); } @Override public void readFrom(StreamInput streamInput) throws IOException { super.readFrom(streamInput); nodeId = streamInput.readString(); - identicalFiles = streamInput.readList(StoreFileMetaData::new); - filesToRecover = streamInput.readList(StoreFileMetaData::new); } @Override public void writeTo(StreamOutput streamOutput) throws IOException { super.writeTo(streamOutput); streamOutput.writeString(nodeId); - streamOutput.writeList(identicalFiles); - streamOutput.writeList(filesToRecover); } public String getNodeId() { return nodeId; } - - public List getIdenticalFiles() { - return identicalFiles; - } - - public List getFilesToRecover() { - return filesToRecover; - } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 1ff509b4de52c..b68d8b0f83fd0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -232,6 +232,7 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, @Override public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, RecoveryState recoveryState) { + // TODO: Add timeouts to network calls / the restore process. final Store store = indexShard.store(); store.incRef(); try { @@ -249,25 +250,42 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v throw new IndexShardRecoveryException(shardId, "failed access store metadata", e); } + Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); + Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); - PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, shardId, recoveryMetadata)).actionGet(); - // The nodeId is necessary to route file chunk requests to appropriate node - String nodeId = response.getNodeId(); - + boolean success = false; try { - // Implement file restore + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); + // The nodeId is necessary to route file chunk requests to appropriate node + String nodeId = response.getNodeId(); + + // TODO: Implement file restore + success = true; } catch (Exception e) { + try { + closeSession(remoteClient, sessionUUID, leaderShardId); + } catch (Exception closeException) { + e.addSuppressed(closeException); + } throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); } finally { - DeleteCcrRestoreSessionRequest deleteRequest = new DeleteCcrRestoreSessionRequest(sessionUUID, shardId); - remoteClient.execute(DeleteCcrRestoreSessionAction.INSTANCE, deleteRequest).actionGet(); + if (success) { + closeSession(remoteClient, sessionUUID, leaderShardId); + } } } @Override - public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + + private void closeSession(Client remoteClient, String sessionUUID, ShardId leaderShardId) { + DeleteCcrRestoreSessionRequest deleteRequest = new DeleteCcrRestoreSessionRequest(sessionUUID, leaderShardId); + remoteClient.execute(DeleteCcrRestoreSessionAction.INSTANCE, deleteRequest).actionGet(); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 562e28ddceb52..5898ce369fc92 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -42,7 +42,7 @@ public CcrRestoreSourceService(Settings settings) { // TODO: Need to register with IndicesService @Override - public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null) { logger.debug("shard [{}] closing, closing sessions", indexShard); HashSet sessions = sessionsForShard.remove(indexShard); @@ -93,24 +93,37 @@ synchronized Engine.IndexCommitRef getIndexCommit(String sessionUUID) { return onGoingRestores.get(sessionUUID); } + // TODO: Add a local timeout for the session. This timeout might might be for the entire session to be + // complete. Or it could be for session to have been touched. public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); - Engine.IndexCommitRef commit; - if (onGoingRestores.containsKey(sessionUUID)) { - logger.debug("session [{}] already exists", sessionUUID); - commit = onGoingRestores.get(sessionUUID); - } else { - commit = indexShard.acquireSafeIndexCommit(); - onGoingRestores.put(sessionUUID, commit); - openSessionListeners.forEach(c -> c.accept(sessionUUID)); - HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); - sessions.add(sessionUUID); - } - indexShard.store().incRef(); + boolean success = false; + Engine.IndexCommitRef commit = null; try { - return indexShard.store().getMetadata(commit.getIndexCommit()); + if (onGoingRestores.containsKey(sessionUUID)) { + logger.debug("session [{}] already exists", sessionUUID); + commit = onGoingRestores.get(sessionUUID); + } else { + commit = indexShard.acquireSafeIndexCommit(); + onGoingRestores.put(sessionUUID, commit); + openSessionListeners.forEach(c -> c.accept(sessionUUID)); + HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); + sessions.add(sessionUUID); + } + indexShard.store().incRef(); + try { + Store.MetadataSnapshot metadata = indexShard.store().getMetadata(commit.getIndexCommit()); + success = true; + return metadata; + } finally { + indexShard.store().decRef(); + } } finally { - indexShard.store().decRef(); + if (success == false) { + onGoingRestores.remove(sessionUUID); + removeSessionForShard(sessionUUID, indexShard); + IOUtils.closeWhileHandlingException(commit); + } } } @@ -122,7 +135,11 @@ public synchronized void closeSession(String sessionUUID, IndexShard indexShard) logger.info("could not close session [{}] for shard [{}] because session not found", sessionUUID, indexShard); throw new ElasticsearchException("session [" + sessionUUID + "] not found"); } + removeSessionForShard(sessionUUID, indexShard); IOUtils.closeWhileHandlingException(commit); + } + + private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { HashSet sessions = sessionsForShard.get(indexShard); sessions.remove(sessionUUID); if (sessions.isEmpty()) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 7362fbfe33442..a8be8db2d9e12 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -105,7 +105,7 @@ public void testCloseShardListenerFunctionality() throws IOException { assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); - restoreSourceService.beforeIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); + restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); assertNull(restoreSourceService.getSessionsForShard(indexShard1)); assertNull(restoreSourceService.getIndexCommit(sessionUUID1)); From 71db9aa963ace5ae3d4dc3e597aa49feef409db9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 11:35:31 -0700 Subject: [PATCH 07/14] Change delete request type --- .../DeleteCcrRestoreSessionAction.java | 104 ++++++++++++++---- .../DeleteCcrRestoreSessionRequest.java | 11 +- .../repository/CcrRestoreSourceService.java | 7 ++ 3 files changed, 97 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java index 877280ff53852..2e4bd11b7f744 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java @@ -7,14 +7,17 @@ package org.elasticsearch.xpack.ccr.action.repositories; import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -23,6 +26,9 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; +import java.io.IOException; +import java.util.List; + public class DeleteCcrRestoreSessionAction extends Action { public static final DeleteCcrRestoreSessionAction INSTANCE = new DeleteCcrRestoreSessionAction(); @@ -34,52 +40,102 @@ private DeleteCcrRestoreSessionAction() { @Override public DeleteCcrRestoreSessionResponse newResponse() { - return new DeleteCcrRestoreSessionResponse(); + throw new UnsupportedOperationException(); } - public static class TransportDeleteCcrRestoreSessionAction - extends TransportSingleShardAction { + public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { private final IndicesService indicesService; private final CcrRestoreSourceService ccrRestoreService; @Inject public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, - IndexNameExpressionResolver resolver, TransportService transportService, - IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, threadPool, clusterService, transportService, actionFilters, resolver, DeleteCcrRestoreSessionRequest::new, - ThreadPool.Names.GENERIC); + TransportService transportService, IndicesService indicesService, + CcrRestoreSourceService ccrRestoreService) { + super(NAME, threadPool, clusterService, transportService, actionFilters, DeleteCcrRestoreSessionRequest::new, + DeleteCcrRestoreSessionRequest.DeleteRequest::new, ThreadPool.Names.GENERIC, DeleteResponse.class); this.indicesService = indicesService; this.ccrRestoreService = ccrRestoreService; } @Override - protected DeleteCcrRestoreSessionResponse shardOperation(DeleteCcrRestoreSessionRequest request, ShardId shardId) { + protected DeleteCcrRestoreSessionResponse newResponse(DeleteCcrRestoreSessionRequest request, List deleteResponses, + List failures) { + return new DeleteCcrRestoreSessionResponse(clusterService.getClusterName(), deleteResponses, failures); + } + + @Override + protected DeleteCcrRestoreSessionRequest.DeleteRequest newNodeRequest(String nodeId, DeleteCcrRestoreSessionRequest request) { + return null; + } + + @Override + protected DeleteResponse newNodeResponse() { + return null; + } + + @Override + protected DeleteResponse nodeOperation(DeleteCcrRestoreSessionRequest.DeleteRequest request) { + ShardId shardId = null; + String sessionUUID = ""; IndexShard indexShard = indicesService.getShardOrNull(shardId); if (indexShard == null) { throw new ShardNotFoundException(shardId); } - ccrRestoreService.closeSession(request.getSessionUUID(), indexShard); - return new DeleteCcrRestoreSessionResponse(); + ccrRestoreService.closeSession(sessionUUID, indexShard); + return new DeleteResponse(clusterService.localNode()); } + } - @Override - protected DeleteCcrRestoreSessionResponse newResponse() { - return new DeleteCcrRestoreSessionResponse(); + public static class DeleteResponse extends BaseNodeResponse { + + private DeleteResponse() { + } + + private DeleteResponse(StreamInput streamInput) throws IOException { + readFrom(streamInput); + } + + private DeleteResponse(DiscoveryNode node) { + super(node); } @Override - protected boolean resolveIndex(DeleteCcrRestoreSessionRequest request) { - return false; + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); } @Override - protected ShardsIterator shards(ClusterState state, InternalRequest request) { - final ShardId shardId = request.request().getShardId(); - return state.routingTable().shardRoutingTable(shardId).primaryShardIt(); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); } } - public static class DeleteCcrRestoreSessionResponse extends ActionResponse { + public static class DeleteCcrRestoreSessionResponse extends BaseNodesResponse { + + DeleteCcrRestoreSessionResponse(StreamInput streamInput) throws IOException { + readFrom(streamInput); + } + + DeleteCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { + super(clusterName, chunkResponses, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(DeleteResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + public void getThing() { + if (hasFailures()) { + throw failures().get(0); + } + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java index caf91df9c1fb0..2ff868c76af25 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ccr.action.repositories; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.single.shard.SingleShardRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -14,7 +16,7 @@ import java.io.IOException; -public class DeleteCcrRestoreSessionRequest extends SingleShardRequest { +public class DeleteCcrRestoreSessionRequest extends BaseNodesRequest { private String sessionUUID; private ShardId shardId; @@ -54,4 +56,11 @@ public String getSessionUUID() { public ShardId getShardId() { return shardId; } + + static class DeleteRequest extends BaseNodeRequest { + + DeleteRequest(){ + + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 5898ce369fc92..cbb723496c80c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -15,8 +15,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -104,6 +106,11 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index logger.debug("session [{}] already exists", sessionUUID); commit = onGoingRestores.get(sessionUUID); } else { + // TODO: Add test + if (indexShard.state() == IndexShardState.CLOSED) { + throw new IllegalIndexShardStateException(indexShard.shardId(), IndexShardState.CLOSED, + "cannot open ccr restore session if shard closed"); + } commit = indexShard.acquireSafeIndexCommit(); onGoingRestores.put(sessionUUID, commit); openSessionListeners.forEach(c -> c.accept(sessionUUID)); From 8d1a151474f8a948082018e89c3c40883cd698ef Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 14:17:10 -0700 Subject: [PATCH 08/14] Changes --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 6 +- ...java => ClearCcrRestoreSessionAction.java} | 60 +++++++-------- .../ClearCcrRestoreSessionRequest.java | 77 +++++++++++++++++++ .../DeleteCcrRestoreSessionRequest.java | 66 ---------------- .../xpack/ccr/repository/CcrRepository.java | 37 +++------ 5 files changed, 118 insertions(+), 128 deletions(-) rename x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/{DeleteCcrRestoreSessionAction.java => ClearCcrRestoreSessionAction.java} (57%) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java delete mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 3b270ad11362a..acb56f7520374 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -58,7 +58,7 @@ import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; @@ -188,8 +188,8 @@ public List> getPersistentTasksExecutor(ClusterServic DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class), new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE, PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class), - new ActionHandler<>(DeleteCcrRestoreSessionAction.INSTANCE, - DeleteCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class), + new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE, + ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java similarity index 57% rename from x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java rename to x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 2e4bd11b7f744..18624509f7e74 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -29,22 +29,22 @@ import java.io.IOException; import java.util.List; -public class DeleteCcrRestoreSessionAction extends Action { +public class ClearCcrRestoreSessionAction extends Action { - public static final DeleteCcrRestoreSessionAction INSTANCE = new DeleteCcrRestoreSessionAction(); - private static final String NAME = "internal:admin/ccr/restore/session/delete"; + public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); + private static final String NAME = "internal:admin/ccr/restore/session/clear"; - private DeleteCcrRestoreSessionAction() { + private ClearCcrRestoreSessionAction() { super(NAME); } @Override - public DeleteCcrRestoreSessionResponse newResponse() { + public ClearCcrRestoreSessionResponse newResponse() { throw new UnsupportedOperationException(); } - public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { + public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { private final IndicesService indicesService; private final CcrRestoreSourceService ccrRestoreService; @@ -53,51 +53,47 @@ public static class TransportDeleteCcrRestoreSessionAction extends TransportNode public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, TransportService transportService, IndicesService indicesService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, threadPool, clusterService, transportService, actionFilters, DeleteCcrRestoreSessionRequest::new, - DeleteCcrRestoreSessionRequest.DeleteRequest::new, ThreadPool.Names.GENERIC, DeleteResponse.class); + super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, + ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class); this.indicesService = indicesService; this.ccrRestoreService = ccrRestoreService; } @Override - protected DeleteCcrRestoreSessionResponse newResponse(DeleteCcrRestoreSessionRequest request, List deleteResponses, - List failures) { - return new DeleteCcrRestoreSessionResponse(clusterService.getClusterName(), deleteResponses, failures); + protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List deleteResponses, + List failures) { + return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), deleteResponses, failures); } @Override - protected DeleteCcrRestoreSessionRequest.DeleteRequest newNodeRequest(String nodeId, DeleteCcrRestoreSessionRequest request) { + protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) { return null; } @Override - protected DeleteResponse newNodeResponse() { + protected Response newNodeResponse() { return null; } @Override - protected DeleteResponse nodeOperation(DeleteCcrRestoreSessionRequest.DeleteRequest request) { - ShardId shardId = null; - String sessionUUID = ""; + protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) { + ShardId shardId = request.getShardId(); IndexShard indexShard = indicesService.getShardOrNull(shardId); if (indexShard == null) { throw new ShardNotFoundException(shardId); } - ccrRestoreService.closeSession(sessionUUID, indexShard); - return new DeleteResponse(clusterService.localNode()); + ccrRestoreService.closeSession(request.getSessionUUID(), indexShard); + return new Response(clusterService.localNode()); } } - public static class DeleteResponse extends BaseNodeResponse { + public static class Response extends BaseNodeResponse { - private DeleteResponse() { - } - - private DeleteResponse(StreamInput streamInput) throws IOException { + private Response(StreamInput streamInput) throws IOException { readFrom(streamInput); } - private DeleteResponse(DiscoveryNode node) { + private Response(DiscoveryNode node) { super(node); } @@ -112,23 +108,19 @@ public void readFrom(StreamInput in) throws IOException { } } - public static class DeleteCcrRestoreSessionResponse extends BaseNodesResponse { - - DeleteCcrRestoreSessionResponse(StreamInput streamInput) throws IOException { - readFrom(streamInput); - } + public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse { - DeleteCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { + ClearCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { super(clusterName, chunkResponses, failures); } @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readList(DeleteResponse::new); + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(Response::new); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { out.writeList(nodes); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java new file mode 100644 index 0000000000000..449a2715e5cfd --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.support.nodes.BaseNodeRequest; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +public class ClearCcrRestoreSessionRequest extends BaseNodesRequest { + + private Request request; + + ClearCcrRestoreSessionRequest() { + } + + public ClearCcrRestoreSessionRequest(String nodeId, Request request) { + super(nodeId); + this.request = request; + } + + @Override + public void readFrom(StreamInput streamInput) throws IOException { + super.readFrom(streamInput); + request.readFrom(streamInput); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); + request.writeTo(streamOutput); + } + + public static class Request extends BaseNodeRequest { + + private String sessionUUID; + private ShardId shardId; + + + Request() { + } + + public Request(String sessionUUID, ShardId shardId) { + this.sessionUUID = sessionUUID; + this.shardId = shardId; + } + + @Override + public void readFrom(StreamInput streamInput) throws IOException { + super.readFrom(streamInput); + sessionUUID = streamInput.readString(); + shardId = ShardId.readShardId(streamInput); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); + streamOutput.writeString(sessionUUID); + shardId.writeTo(streamOutput); + } + + public String getSessionUUID() { + return sessionUUID; + } + + public ShardId getShardId() { + return shardId; + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java deleted file mode 100644 index 2ff868c76af25..0000000000000 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteCcrRestoreSessionRequest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ccr.action.repositories; - -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.nodes.BaseNodeRequest; -import org.elasticsearch.action.support.nodes.BaseNodesRequest; -import org.elasticsearch.action.support.single.shard.SingleShardRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -public class DeleteCcrRestoreSessionRequest extends BaseNodesRequest { - - private String sessionUUID; - private ShardId shardId; - - DeleteCcrRestoreSessionRequest() { - } - - public DeleteCcrRestoreSessionRequest(String sessionUUID, ShardId shardId) { - super(shardId.getIndexName()); - this.sessionUUID = sessionUUID; - this.shardId = shardId; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - sessionUUID = streamInput.readString(); - shardId = ShardId.readShardId(streamInput); - } - - @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - streamOutput.writeString(sessionUUID); - shardId.writeTo(streamOutput); - } - - public String getSessionUUID() { - return sessionUUID; - } - - public ShardId getShardId() { - return shardId; - } - - static class DeleteRequest extends BaseNodeRequest { - - DeleteRequest(){ - - } - } -} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index b68d8b0f83fd0..2772118fe8ee2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -37,8 +37,8 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; -import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionAction; -import org.elasticsearch.xpack.ccr.action.repositories.DeleteCcrRestoreSessionRequest; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest; @@ -257,26 +257,11 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); boolean success = false; - try { - PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); - // The nodeId is necessary to route file chunk requests to appropriate node - String nodeId = response.getNodeId(); - - // TODO: Implement file restore - success = true; - } catch (Exception e) { - try { - closeSession(remoteClient, sessionUUID, leaderShardId); - } catch (Exception closeException) { - e.addSuppressed(closeException); - } - throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); - } finally { - if (success) { - closeSession(remoteClient, sessionUUID, leaderShardId); - } - } + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); + String nodeId = response.getNodeId(); + // TODO: Implement file restore + closeSession(remoteClient, nodeId, sessionUUID, leaderShardId); } @Override @@ -284,8 +269,10 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - private void closeSession(Client remoteClient, String sessionUUID, ShardId leaderShardId) { - DeleteCcrRestoreSessionRequest deleteRequest = new DeleteCcrRestoreSessionRequest(sessionUUID, leaderShardId); - remoteClient.execute(DeleteCcrRestoreSessionAction.INSTANCE, deleteRequest).actionGet(); + private void closeSession(Client remoteClient, String nodeId, String sessionUUID, ShardId leaderShardId) { + ClearCcrRestoreSessionRequest.Request nodRequest = + new ClearCcrRestoreSessionRequest.Request(sessionUUID, leaderShardId); + ClearCcrRestoreSessionRequest deleteRequest = new ClearCcrRestoreSessionRequest(nodeId, nodRequest); + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, deleteRequest).actionGet(); } } From 500350c799b455a473b1ddf815954e3aa451ee2d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 14:35:19 -0700 Subject: [PATCH 09/14] Cleanup --- .../ClearCcrRestoreSessionAction.java | 21 +++++++++++-------- .../ClearCcrRestoreSessionRequest.java | 8 ++++++- .../xpack/ccr/repository/CcrRepository.java | 12 ++++++----- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 18624509f7e74..03997c800c430 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -40,7 +40,7 @@ private ClearCcrRestoreSessionAction() { @Override public ClearCcrRestoreSessionResponse newResponse() { - throw new UnsupportedOperationException(); + return new ClearCcrRestoreSessionResponse(); } public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { + ClearCcrRestoreSessionResponse() { + } + ClearCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { super(clusterName, chunkResponses, failures); } @@ -123,11 +132,5 @@ protected List readNodesFrom(StreamInput in) throws IOException { protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { out.writeList(nodes); } - - public void getThing() { - if (hasFailures()) { - throw failures().get(0); - } - } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 449a2715e5cfd..f888ca93cc559 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -29,6 +29,7 @@ public ClearCcrRestoreSessionRequest(String nodeId, Request request) { @Override public void readFrom(StreamInput streamInput) throws IOException { super.readFrom(streamInput); + request = new Request(); request.readFrom(streamInput); } @@ -38,6 +39,10 @@ public void writeTo(StreamOutput streamOutput) throws IOException { request.writeTo(streamOutput); } + public Request getRequest() { + return request; + } + public static class Request extends BaseNodeRequest { private String sessionUUID; @@ -47,7 +52,8 @@ public static class Request extends BaseNodeRequest { Request() { } - public Request(String sessionUUID, ShardId shardId) { + public Request(String nodeId, String sessionUUID, ShardId shardId) { + super(nodeId); this.sessionUUID = sessionUUID; this.shardId = shardId; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 2772118fe8ee2..d45f116fed94c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -256,7 +256,6 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); - boolean success = false; PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); String nodeId = response.getNodeId(); @@ -270,9 +269,12 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve } private void closeSession(Client remoteClient, String nodeId, String sessionUUID, ShardId leaderShardId) { - ClearCcrRestoreSessionRequest.Request nodRequest = - new ClearCcrRestoreSessionRequest.Request(sessionUUID, leaderShardId); - ClearCcrRestoreSessionRequest deleteRequest = new ClearCcrRestoreSessionRequest(nodeId, nodRequest); - remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, deleteRequest).actionGet(); + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, + new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID, leaderShardId)); + ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + if (response.hasFailures()) { + throw response.failures().get(0); + } } } From 5f9c1f3a6ad06932fe6561ea914abf5ae9a5458f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 15:00:52 -0700 Subject: [PATCH 10/14] WIP --- .../ClearCcrRestoreSessionAction.java | 3 -- .../repository/CcrRestoreSourceService.java | 29 +++++++++++-------- .../CcrRestoreSourceServiceTests.java | 10 ++++++- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 03997c800c430..afddb244634f4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -82,9 +82,6 @@ protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) if (indexShard == null) { throw new ShardNotFoundException(shardId); } - if (true) { - throw new UnsupportedOperationException(); - } ccrRestoreService.closeSession(request.getSessionUUID(), indexShard); return new Response(clusterService.localNode()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index cbb723496c80c..a475f9f4f2e67 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -106,7 +106,6 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index logger.debug("session [{}] already exists", sessionUUID); commit = onGoingRestores.get(sessionUUID); } else { - // TODO: Add test if (indexShard.state() == IndexShardState.CLOSED) { throw new IllegalIndexShardStateException(indexShard.shardId(), IndexShardState.CLOSED, "cannot open ccr restore session if shard closed"); @@ -117,14 +116,9 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); } - indexShard.store().incRef(); - try { - Store.MetadataSnapshot metadata = indexShard.store().getMetadata(commit.getIndexCommit()); - success = true; - return metadata; - } finally { - indexShard.store().decRef(); - } + Store.MetadataSnapshot metaData = getMetaData(indexShard, commit); + success = true; + return metaData; } finally { if (success == false) { onGoingRestores.remove(sessionUUID); @@ -146,11 +140,22 @@ public synchronized void closeSession(String sessionUUID, IndexShard indexShard) IOUtils.closeWhileHandlingException(commit); } + private Store.MetadataSnapshot getMetaData(IndexShard indexShard, Engine.IndexCommitRef commit) throws IOException { + indexShard.store().incRef(); + try { + return indexShard.store().getMetadata(commit.getIndexCommit()); + } finally { + indexShard.store().decRef(); + } + } + private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { HashSet sessions = sessionsForShard.get(indexShard); - sessions.remove(sessionUUID); - if (sessions.isEmpty()) { - sessionsForShard.remove(indexShard); + if (sessions != null) { + sessions.remove(sessionUUID); + if (sessions.isEmpty()) { + sessionsForShard.remove(indexShard); + } } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index a8be8db2d9e12..21177ae7e2b0a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.junit.Before; @@ -57,6 +58,14 @@ public void testOpenSession() throws IOException { closeShards(indexShard1, indexShard2); } + public void testCannotOpenSessionForClosedShard() throws IOException { + IndexShard indexShard = newStartedShard(true); + closeShards(indexShard); + String sessionUUID = UUIDs.randomBase64UUID(); + expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard)); + assertNull(restoreSourceService.getIndexCommit(sessionUUID)); + } + public void testCloseSession() throws IOException { IndexShard indexShard1 = newStartedShard(true); IndexShard indexShard2 = newStartedShard(true); @@ -113,6 +122,5 @@ public void testCloseShardListenerFunctionality() throws IOException { restoreSourceService.closeSession(sessionUUID3, indexShard2); closeShards(indexShard1, indexShard2); - } } From 17a5f2452851cb8712193d39f95275369902431b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 15:19:49 -0700 Subject: [PATCH 11/14] Add listener --- .../main/java/org/elasticsearch/xpack/ccr/Ccr.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index acb56f7520374..9bc4535a1d596 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.license.XPackLicenseState; @@ -115,6 +116,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce repositoryManager = new SetOnce<>(); + private final SetOnce restoreSourceService = new SetOnce<>(); private Client client; /** @@ -156,10 +158,11 @@ public Collection createComponents( } this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, client)); - + CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings); + this.restoreSourceService.set(restoreSourceService); return Arrays.asList( ccrLicenseChecker, - new CcrRestoreSourceService(settings), + restoreSourceService, new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker) ); } @@ -289,6 +292,11 @@ public Map getInternalRepositories(Environment env, return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addIndexEventListener(this.restoreSourceService.get()); + } + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } } From be2e6e808c933067f70e6bcd1dd4ae6e15a6e4e7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 13 Dec 2018 16:03:06 -0700 Subject: [PATCH 12/14] cleanups --- .../ClearCcrRestoreSessionAction.java | 20 +---- .../ClearCcrRestoreSessionRequest.java | 12 +-- .../PutCcrRestoreSessionAction.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../repository/CcrRestoreSourceService.java | 87 +++++++++++-------- .../CcrRestoreSourceServiceTests.java | 33 ++++--- 6 files changed, 76 insertions(+), 84 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index afddb244634f4..531413002fffd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -18,10 +18,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; @@ -46,23 +42,20 @@ public ClearCcrRestoreSessionResponse newResponse() { public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { - private final IndicesService indicesService; private final CcrRestoreSourceService ccrRestoreService; @Inject public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, - TransportService transportService, IndicesService indicesService, - CcrRestoreSourceService ccrRestoreService) { + TransportService transportService, CcrRestoreSourceService ccrRestoreService) { super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class); - this.indicesService = indicesService; this.ccrRestoreService = ccrRestoreService; } @Override - protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List deleteResponses, + protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List responses, List failures) { - return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), deleteResponses, failures); + return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures); } @Override @@ -77,12 +70,7 @@ protected Response newNodeResponse() { @Override protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) { - ShardId shardId = request.getShardId(); - IndexShard indexShard = indicesService.getShardOrNull(shardId); - if (indexShard == null) { - throw new ShardNotFoundException(shardId); - } - ccrRestoreService.closeSession(request.getSessionUUID(), indexShard); + ccrRestoreService.closeSession(request.getSessionUUID()); return new Response(clusterService.localNode()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index f888ca93cc559..7905f0f796441 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -46,38 +45,29 @@ public Request getRequest() { public static class Request extends BaseNodeRequest { private String sessionUUID; - private ShardId shardId; - Request() { } - public Request(String nodeId, String sessionUUID, ShardId shardId) { + public Request(String nodeId, String sessionUUID) { super(nodeId); this.sessionUUID = sessionUUID; - this.shardId = shardId; } @Override public void readFrom(StreamInput streamInput) throws IOException { super.readFrom(streamInput); sessionUUID = streamInput.readString(); - shardId = ShardId.readShardId(streamInput); } @Override public void writeTo(StreamOutput streamOutput) throws IOException { super.writeTo(streamOutput); streamOutput.writeString(sessionUUID); - shardId.writeTo(streamOutput); } public String getSessionUUID() { return sessionUUID; } - - public ShardId getShardId() { - return shardId; - } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 573f3eeee07da..ab6793eed6059 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -39,7 +39,7 @@ private PutCcrRestoreSessionAction() { @Override public PutCcrRestoreSessionResponse newResponse() { - throw new UnsupportedOperationException(); + return new PutCcrRestoreSessionResponse(); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index d45f116fed94c..aeaa7fc5eaf57 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -260,7 +260,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); String nodeId = response.getNodeId(); // TODO: Implement file restore - closeSession(remoteClient, nodeId, sessionUUID, leaderShardId); + closeSession(remoteClient, nodeId, sessionUUID); } @Override @@ -268,9 +268,9 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - private void closeSession(Client remoteClient, String nodeId, String sessionUUID, ShardId leaderShardId) { + private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, - new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID, leaderShardId)); + new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); if (response.hasFailures()) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index a475f9f4f2e67..c64c2c19e69ec 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -33,7 +34,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); - private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); @@ -42,17 +43,14 @@ public CcrRestoreSourceService(Settings settings) { super(settings); } - // TODO: Need to register with IndicesService @Override public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null) { - logger.debug("shard [{}] closing, closing sessions", indexShard); HashSet sessions = sessionsForShard.remove(indexShard); if (sessions != null) { for (String sessionUUID : sessions) { - logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); - Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); - IOUtils.closeWhileHandlingException(commit); + RestoreContext restore = onGoingRestores.remove(sessionUUID); + IOUtils.closeWhileHandlingException(restore); } } } @@ -91,70 +89,87 @@ synchronized HashSet getSessionsForShard(IndexShard indexShard) { } // default visibility for testing - synchronized Engine.IndexCommitRef getIndexCommit(String sessionUUID) { + synchronized RestoreContext getOngoingRestore(String sessionUUID) { return onGoingRestores.get(sessionUUID); } // TODO: Add a local timeout for the session. This timeout might might be for the entire session to be // complete. Or it could be for session to have been touched. public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { - logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); boolean success = false; - Engine.IndexCommitRef commit = null; + RestoreContext restore = null; try { if (onGoingRestores.containsKey(sessionUUID)) { - logger.debug("session [{}] already exists", sessionUUID); - commit = onGoingRestores.get(sessionUUID); + logger.debug("not opening new session [{}] as it already exists", sessionUUID); + restore = onGoingRestores.get(sessionUUID); } else { + logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); if (indexShard.state() == IndexShardState.CLOSED) { throw new IllegalIndexShardStateException(indexShard.shardId(), IndexShardState.CLOSED, "cannot open ccr restore session if shard closed"); } - commit = indexShard.acquireSafeIndexCommit(); - onGoingRestores.put(sessionUUID, commit); + restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); + onGoingRestores.put(sessionUUID, restore); openSessionListeners.forEach(c -> c.accept(sessionUUID)); HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); } - Store.MetadataSnapshot metaData = getMetaData(indexShard, commit); + Store.MetadataSnapshot metaData = restore.getMetaData(); success = true; return metaData; } finally { if (success == false) { onGoingRestores.remove(sessionUUID); - removeSessionForShard(sessionUUID, indexShard); - IOUtils.closeWhileHandlingException(commit); + IOUtils.closeWhileHandlingException(restore); } } } - public synchronized void closeSession(String sessionUUID, IndexShard indexShard) { - logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); + public synchronized void closeSession(String sessionUUID) { closeSessionListeners.forEach(c -> c.accept(sessionUUID)); - Engine.IndexCommitRef commit = onGoingRestores.remove(sessionUUID); - if (commit == null) { - logger.info("could not close session [{}] for shard [{}] because session not found", sessionUUID, indexShard); + RestoreContext restore = onGoingRestores.remove(sessionUUID); + if (restore == null) { + logger.info("could not close session [{}] because session not found", sessionUUID); throw new ElasticsearchException("session [" + sessionUUID + "] not found"); } - removeSessionForShard(sessionUUID, indexShard); - IOUtils.closeWhileHandlingException(commit); + IOUtils.closeWhileHandlingException(restore); } - private Store.MetadataSnapshot getMetaData(IndexShard indexShard, Engine.IndexCommitRef commit) throws IOException { - indexShard.store().incRef(); - try { - return indexShard.store().getMetadata(commit.getIndexCommit()); - } finally { - indexShard.store().decRef(); + private class RestoreContext implements Closeable { + + private final String sessionUUID; + private final IndexShard indexShard; + private final Engine.IndexCommitRef commitRef; + + private RestoreContext(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + this.sessionUUID = sessionUUID; + this.indexShard = indexShard; + this.commitRef = commitRef; + } + + Store.MetadataSnapshot getMetaData() throws IOException { + indexShard.store().incRef(); + try { + return indexShard.store().getMetadata(commitRef.getIndexCommit()); + } finally { + indexShard.store().decRef(); + } + } + + @Override + public void close() { + removeSessionForShard(sessionUUID, indexShard); + IOUtils.closeWhileHandlingException(commitRef); } - } - private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { - HashSet sessions = sessionsForShard.get(indexShard); - if (sessions != null) { - sessions.remove(sessionUUID); - if (sessions.isEmpty()) { - sessionsForShard.remove(indexShard); + private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); + HashSet sessions = sessionsForShard.get(indexShard); + if (sessions != null) { + sessions.remove(sessionUUID); + if (sessions.isEmpty()) { + sessionsForShard.remove(indexShard); + } } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 21177ae7e2b0a..dfa7e5ef12660 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -26,7 +26,6 @@ public void setUp() throws Exception { restoreSourceService = new CcrRestoreSourceService(Settings.EMPTY); } - public void testOpenSession() throws IOException { IndexShard indexShard1 = newStartedShard(true); IndexShard indexShard2 = newStartedShard(true); @@ -51,9 +50,9 @@ public void testOpenSession() throws IOException { assertEquals(1, sessionsForShard.size()); assertTrue(sessionsForShard.contains(sessionUUID3)); - restoreSourceService.closeSession(sessionUUID1, indexShard1); - restoreSourceService.closeSession(sessionUUID2, indexShard1); - restoreSourceService.closeSession(sessionUUID3, indexShard2); + restoreSourceService.closeSession(sessionUUID1); + restoreSourceService.closeSession(sessionUUID2); + restoreSourceService.closeSession(sessionUUID3); closeShards(indexShard1, indexShard2); } @@ -63,7 +62,7 @@ public void testCannotOpenSessionForClosedShard() throws IOException { closeShards(indexShard); String sessionUUID = UUIDs.randomBase64UUID(); expectThrows(IllegalIndexShardStateException.class, () -> restoreSourceService.openSession(sessionUUID, indexShard)); - assertNull(restoreSourceService.getIndexCommit(sessionUUID)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID)); } public void testCloseSession() throws IOException { @@ -79,23 +78,23 @@ public void testCloseSession() throws IOException { assertEquals(2, restoreSourceService.getSessionsForShard(indexShard1).size()); assertEquals(1, restoreSourceService.getSessionsForShard(indexShard2).size()); - assertNotNull(restoreSourceService.getIndexCommit(sessionUUID1)); - assertNotNull(restoreSourceService.getIndexCommit(sessionUUID2)); - assertNotNull(restoreSourceService.getIndexCommit(sessionUUID3)); + assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID1)); + assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID2)); + assertNotNull(restoreSourceService.getOngoingRestore(sessionUUID3)); - restoreSourceService.closeSession(sessionUUID1, indexShard1); + restoreSourceService.closeSession(sessionUUID1); assertEquals(1, restoreSourceService.getSessionsForShard(indexShard1).size()); - assertNull(restoreSourceService.getIndexCommit(sessionUUID1)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); assertFalse(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID1)); assertTrue(restoreSourceService.getSessionsForShard(indexShard1).contains(sessionUUID2)); - restoreSourceService.closeSession(sessionUUID2, indexShard1); + restoreSourceService.closeSession(sessionUUID2); assertNull(restoreSourceService.getSessionsForShard(indexShard1)); - assertNull(restoreSourceService.getIndexCommit(sessionUUID2)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); - restoreSourceService.closeSession(sessionUUID3, indexShard2); + restoreSourceService.closeSession(sessionUUID3); assertNull(restoreSourceService.getSessionsForShard(indexShard2)); - assertNull(restoreSourceService.getIndexCommit(sessionUUID3)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID3)); closeShards(indexShard1, indexShard2); } @@ -117,10 +116,10 @@ public void testCloseShardListenerFunctionality() throws IOException { restoreSourceService.afterIndexShardClosed(indexShard1.shardId(), indexShard1, Settings.EMPTY); assertNull(restoreSourceService.getSessionsForShard(indexShard1)); - assertNull(restoreSourceService.getIndexCommit(sessionUUID1)); - assertNull(restoreSourceService.getIndexCommit(sessionUUID2)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID1)); + assertNull(restoreSourceService.getOngoingRestore(sessionUUID2)); - restoreSourceService.closeSession(sessionUUID3, indexShard2); + restoreSourceService.closeSession(sessionUUID3); closeShards(indexShard1, indexShard2); } } From e275823e5b5b9c2bb194bd67fd8d5d0bba8ed7c3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 14 Dec 2018 17:01:43 -0700 Subject: [PATCH 13/14] Changes from review --- .../ClearCcrRestoreSessionAction.java | 4 ++-- .../ClearCcrRestoreSessionRequest.java | 12 +++++------ .../PutCcrRestoreSessionAction.java | 18 ++++++++--------- .../PutCcrRestoreSessionRequest.java | 20 +++++++++---------- .../repository/CcrRestoreSourceService.java | 4 ++-- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 531413002fffd..33b8b415d8362 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -80,8 +80,8 @@ public static class Response extends BaseNodeResponse { private Response() { } - private Response(StreamInput streamInput) throws IOException { - readFrom(streamInput); + private Response(StreamInput in) throws IOException { + readFrom(in); } private Response(DiscoveryNode node) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 7905f0f796441..11605970736b0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -55,15 +55,15 @@ public Request(String nodeId, String sessionUUID) { } @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - sessionUUID = streamInput.readString(); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); } @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - streamOutput.writeString(sessionUUID); + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); } public String getSessionUUID() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index ab6793eed6059..7f362aa3b766c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -102,21 +102,21 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse { this.nodeId = nodeId; } - PutCcrRestoreSessionResponse(StreamInput streamInput) throws IOException { - super(streamInput); - nodeId = streamInput.readString(); + PutCcrRestoreSessionResponse(StreamInput in) throws IOException { + super(in); + nodeId = in.readString(); } @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - nodeId = streamInput.readString(); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); } @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - streamOutput.writeString(nodeId); + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); } public String getNodeId() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java index e9ad0181c4cb5..2b94193f674af 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -37,19 +37,19 @@ public ActionRequestValidationException validate() { } @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - sessionUUID = streamInput.readString(); - shardId = ShardId.readShardId(streamInput); - metaData = new Store.MetadataSnapshot(streamInput); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); + shardId = ShardId.readShardId(in); + metaData = new Store.MetadataSnapshot(in); } @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - streamOutput.writeString(sessionUUID); - shardId.writeTo(streamOutput); - metaData.writeTo(streamOutput); + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); + shardId.writeTo(out); + metaData.writeTo(out); } public String getSessionUUID() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index c64c2c19e69ec..81e7bb3c53d0b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; @@ -130,7 +129,7 @@ public synchronized void closeSession(String sessionUUID) { RestoreContext restore = onGoingRestores.remove(sessionUUID); if (restore == null) { logger.info("could not close session [{}] because session not found", sessionUUID); - throw new ElasticsearchException("session [" + sessionUUID + "] not found"); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); } IOUtils.closeWhileHandlingException(restore); } @@ -158,6 +157,7 @@ Store.MetadataSnapshot getMetaData() throws IOException { @Override public void close() { + assert Thread.holdsLock(CcrRestoreSourceService.this); removeSessionForShard(sessionUUID, indexShard); IOUtils.closeWhileHandlingException(commitRef); } From 8a6ae8da7c0a82b919f71f5904f013d2369c1f83 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 18 Dec 2018 09:02:26 -0700 Subject: [PATCH 14/14] Changes --- .../xpack/ccr/repository/CcrRestoreSourceService.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 81e7bb3c53d0b..642036168ad7b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -14,9 +14,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -102,10 +102,9 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index logger.debug("not opening new session [{}] as it already exists", sessionUUID); restore = onGoingRestores.get(sessionUUID); } else { - logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard); + logger.debug("opening session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); if (indexShard.state() == IndexShardState.CLOSED) { - throw new IllegalIndexShardStateException(indexShard.shardId(), IndexShardState.CLOSED, - "cannot open ccr restore session if shard closed"); + throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); } restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); onGoingRestores.put(sessionUUID, restore); @@ -163,7 +162,7 @@ public void close() { } private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { - logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard); + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); HashSet sessions = sessionsForShard.get(indexShard); if (sessions != null) { sessions.remove(sessionUUID);