Skip to content

Commit

Permalink
Port ES snapshotting code.
Browse files Browse the repository at this point in the history
- Fix two races condition that lead to stuck snapshots (elastic/elasticsearch#37686)
- Improve resilience SnapshotShardService (elastic/elasticsearch#36113)
- Fix concurrent snapshot ending and stabilize snapshot finalization (elastic/elasticsearch#38368)
  • Loading branch information
kovrus committed Apr 26, 2019
1 parent 7a1fda5 commit 38c4155
Show file tree
Hide file tree
Showing 13 changed files with 967 additions and 1,104 deletions.
2 changes: 2 additions & 0 deletions blackbox/docs/appendices/release-notes/unreleased.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ Deprecations
Changes
=======

- Improved resiliency of the :ref:`ref-create-snapshot` operation.

- Added support for `SQL Standard Timestamp Format
<https://crate.io/docs/sql-99/en/latest/chapters/08.html#timestamp-literal>`_
to the :ref:`date-time-types`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.elasticsearch.action;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand Down Expand Up @@ -49,8 +53,8 @@ public interface ActionListener<Response> {
* @return a listener that listens for responses and invokes the consumer when received
*/
static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse,
Consumer<Exception> onFailure) {
return new ActionListener<Response>() {
Consumer<Exception> onFailure) {
return new ActionListener<>() {
@Override
public void onResponse(Response response) {
try {
Expand Down Expand Up @@ -79,6 +83,57 @@ static <Response> ActionListener<Response> wrap(Runnable runnable) {
return wrap(r -> runnable.run(), e -> runnable.run());
}

/**
* Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception
* the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining
* listeners will be processed and the caught exception will be re-thrown.
*/
static <Response> void onResponse(Iterable<ActionListener<Response>> listeners, Response response) {
List<Exception> exceptionList = new ArrayList<>();
for (ActionListener<Response> listener : listeners) {
try {
listener.onResponse(response);
} catch (Exception ex) {
try {
listener.onFailure(ex);
} catch (Exception ex1) {
exceptionList.add(ex1);
}
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
}

/**
* Notifies every given listener with the failure passed to {@link #onFailure(Exception)}. If a listener itself throws an exception
* all remaining listeners will be processed and the caught exception will be re-thrown.
*/
static <Response> void onFailure(Iterable<ActionListener<Response>> listeners, Exception failure) {
List<Exception> exceptionList = new ArrayList<>();
for (ActionListener<Response> listener : listeners) {
try {
listener.onFailure(failure);
} catch (Exception ex) {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
}

/**
* Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along
* exceptions to the delegate.
*
* @param listener Listener to delegate to
* @param fn Function to apply to listener response
* @param <Response> Response type of the new listener
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to its delegate listener
*/
static <T, Response> ActionListener<Response> map(ActionListener<T> listener, CheckedFunction<Response, T, Exception> fn) {
return wrap(r -> listener.onResponse(fn.apply(r)), listener::onFailure);
}

/**
* Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture}
* api.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
* </ul>
*/
public class CreateSnapshotRequest extends MasterNodeRequest<CreateSnapshotRequest>
implements IndicesRequest.Replaceable, ToXContentObject {
implements IndicesRequest.Replaceable, ToXContentObject {

private String snapshot;

Expand Down Expand Up @@ -94,6 +94,31 @@ public CreateSnapshotRequest(String repository, String snapshot) {
this.repository = repository;
}

public CreateSnapshotRequest(StreamInput in) throws IOException {
super(in);
snapshot = in.readString();
repository = in.readString();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
settings = readSettingsFromStream(in);
includeGlobalState = in.readBoolean();
waitForCompletion = in.readBoolean();
partial = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(snapshot);
out.writeString(repository);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
writeSettingsToStream(settings, out);
out.writeBoolean(includeGlobalState);
out.writeBoolean(waitForCompletion);
out.writeBoolean(partial);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down Expand Up @@ -414,28 +439,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
snapshot = in.readString();
repository = in.readString();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
settings = readSettingsFromStream(in);
includeGlobalState = in.readBoolean();
waitForCompletion = in.readBoolean();
partial = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(snapshot);
out.writeString(repository);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
writeSettingsToStream(settings, out);
out.writeBoolean(includeGlobalState);
out.writeBoolean(waitForCompletion);
out.writeBoolean(partial);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -40,12 +38,11 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
private final SnapshotsService snapshotsService;

@Inject
public TransportCreateSnapshotAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
public TransportCreateSnapshotAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(CreateSnapshotAction.NAME, transportService, clusterService, threadPool, indexNameExpressionResolver, CreateSnapshotRequest::new);
super(CreateSnapshotAction.NAME, transportService, clusterService, threadPool,
CreateSnapshotRequest::new, indexNameExpressionResolver);
this.snapshotsService = snapshotsService;
}

Expand All @@ -66,52 +63,17 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste
if (clusterBlockException != null) {
return clusterBlockException;
}
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, indexNameExpressionResolver.concreteIndexNames(state, request));
return state.blocks()
.indicesBlockedException(ClusterBlockLevel.READ, indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state, final ActionListener<CreateSnapshotResponse> listener) {
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
SnapshotsService.SnapshotRequest snapshotRequest =
new SnapshotsService.SnapshotRequest(request.repository(), snapshotName, "create_snapshot [" + snapshotName + "]")
.indices(request.indices())
.indicesOptions(request.indicesOptions())
.partial(request.partial())
.settings(request.settings())
.includeGlobalState(request.includeGlobalState())
.masterNodeTimeout(request.masterNodeTimeout());
snapshotsService.createSnapshot(snapshotRequest, new SnapshotsService.CreateSnapshotListener() {
@Override
public void onResponse() {
if (request.waitForCompletion()) {
snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() {
@Override
public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(snapshotName)) {
listener.onResponse(new CreateSnapshotResponse(snapshotInfo));
snapshotsService.removeListener(this);
}
}

@Override
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(snapshotName)) {
listener.onFailure(e);
snapshotsService.removeListener(this);
}
}
});
} else {
listener.onResponse(new CreateSnapshotResponse());
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ public DeleteSnapshotRequest(String repository, String snapshot) {
this.snapshot = snapshot;
}

/**
* Constructs a new delete snapshots request with repository name
*
* @param repository repository name
*/
public DeleteSnapshotRequest(String repository) {
this.repository = repository;
public DeleteSnapshotRequest(StreamInput in) throws IOException {
super(in);
repository = in.readString();
snapshot = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
out.writeString(snapshot);
}

@Override
Expand Down Expand Up @@ -115,15 +119,6 @@ public DeleteSnapshotRequest snapshot(String snapshot) {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
repository = in.readString();
snapshot = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
out.writeString(snapshot);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
private final SnapshotsService snapshotsService;

@Inject
public TransportDeleteSnapshotAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
public TransportDeleteSnapshotAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(DeleteSnapshotAction.NAME, transportService, clusterService, threadPool, indexNameExpressionResolver, DeleteSnapshotRequest::new);
super(DeleteSnapshotAction.NAME, transportService, clusterService, threadPool,
DeleteSnapshotRequest::new, indexNameExpressionResolver);
this.snapshotsService = snapshotsService;
}

Expand All @@ -65,17 +64,9 @@ protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, Cluste
}

@Override
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state, final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), new SnapshotsService.DeleteSnapshotListener() {
@Override
public void onResponse() {
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, false);
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(),
ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public String toString() {
return builder.append("]").toString();
}

public boolean isEmpty() {
return entries.isEmpty();
}

/**
* Restore metadata
*/
Expand Down
Loading

0 comments on commit 38c4155

Please sign in to comment.