Skip to content

Commit

Permalink
Use GetSnapshotsResponse.Response in GroupedActionListener
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Ershov committed May 23, 2019
1 parent 845ed5f commit c00640f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -53,17 +54,13 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb
(p, c) -> Response.fromXContent(p), new ParseField("responses"));
}

public GetSnapshotsResponse(Map<String, List<SnapshotInfo>> successfulResponses, Map<String, ElasticsearchException> failedResponses) {
this.successfulResponses = successfulResponses;
this.failedResponses = failedResponses;
}

private static class Response {
String repository;
List<SnapshotInfo> snapshots;
ElasticsearchException error;
public static class Response {
private String repository;
private List<SnapshotInfo> snapshots;
private ElasticsearchException error;

static final ConstructingObjectParser<Response, Void> RESPONSE_PARSER =
private static final ConstructingObjectParser<Response, Void> RESPONSE_PARSER =
new ConstructingObjectParser<>(Response.class.getName(), true,
(args) -> new Response((String) args[0],
(List<SnapshotInfo>) args[1], (ElasticsearchException) args[2]));
Expand All @@ -76,21 +73,29 @@ private static class Response {
(p, c) -> ElasticsearchException.fromXContent(p), new ParseField("error"));
}

Response(String repository, List<SnapshotInfo> snapshots, ElasticsearchException error) {
private Response(String repository, List<SnapshotInfo> snapshots, ElasticsearchException error) {
this.repository = repository;
this.snapshots = snapshots;
this.error = error;
}

static Response fromXContent(XContentParser parser) throws IOException {
public static Response snapshots(String repository, List<SnapshotInfo> snapshots) {
return new Response(repository, snapshots, null);
}

public static Response error(String repository, ElasticsearchException error) {
return new Response(repository, null, error);
}

private static Response fromXContent(XContentParser parser) throws IOException {
return RESPONSE_PARSER.parse(parser, null);
}
}

private Map<String, List<SnapshotInfo>> successfulResponses = Collections.emptyMap();
private Map<String, ElasticsearchException> failedResponses = Collections.emptyMap();

private GetSnapshotsResponse(List<Response> responses) {
public GetSnapshotsResponse(Collection<Response> responses) {
this.successfulResponses = new HashMap<>();
this.failedResponses = new HashMap<>();
for (Response response : responses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,42 +106,22 @@ protected void masterOperation(final GetSnapshotsRequest request, final ClusterS

private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String[] snapshots, boolean ignoreUnavailable,
boolean verbose, ActionListener<GetSnapshotsResponse> listener) {
GroupedActionListener<Tuple<String, Tuple<List<SnapshotInfo>, ElasticsearchException>>> groupedActionListener =
final GroupedActionListener<GetSnapshotsResponse.Response> groupedActionListener =
new GroupedActionListener<>(
ActionListener.map(listener, responses -> {
assert repos.size() == responses.size();

Map<String, List<SnapshotInfo>> successfulResponses = new HashMap<>();
Map<String, ElasticsearchException> failedResponses = new HashMap<>();

Iterator<Tuple<String, Tuple<List<SnapshotInfo>, ElasticsearchException>>> it = responses.iterator();

while (it.hasNext()) {
Tuple<String, Tuple<List<SnapshotInfo>, ElasticsearchException>> response = it.next();
String repo = response.v1();
Tuple<List<SnapshotInfo>, ElasticsearchException> result = response.v2();
if (result.v1() != null) {
assert result.v2() == null;
successfulResponses.put(repo, result.v1());
} else {
assert result.v2() != null;
failedResponses.put(repo, result.v2());
}
}

return new GetSnapshotsResponse(successfulResponses, failedResponses);
return new GetSnapshotsResponse(responses);
}), repos.size());

// run concurrently for all repos on GENERIC thread pool
for (final RepositoryMetaData repo : repos) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(
() -> {
// Unfortunately, there is no Either in Java, so we use Tuple with only one value set
try {
groupedActionListener.onResponse(Tuple.tuple(repo.name(),
Tuple.tuple(getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose), null)));
groupedActionListener.onResponse(GetSnapshotsResponse.Response.snapshots(
repo.name(), getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose)));
} catch (ElasticsearchException e) {
groupedActionListener.onResponse(Tuple.tuple(repo.name(), Tuple.tuple(null, e)));
groupedActionListener.onResponse(GetSnapshotsResponse.Response.error(repo.name(), e));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -86,22 +85,21 @@ private List<SnapshotInfo> createSnapshotInfos() {

private GetSnapshotsResponse createTestInstance() {
Set<String> repositories = new HashSet<>();
Map<String, List<SnapshotInfo>> successfulResponses = new HashMap<>();
Map<String, ElasticsearchException> failedResponses = new HashMap<>();
List<GetSnapshotsResponse.Response> responses = new ArrayList<>();

for (int i = 0; i < randomIntBetween(0, 5); i++) {
String repository = randomValueOtherThanMany(r -> repositories.contains(r), () -> randomAlphaOfLength(10));
repositories.add(repository);
successfulResponses.put(repository, createSnapshotInfos());
responses.add(GetSnapshotsResponse.Response.snapshots(repository, createSnapshotInfos()));
}

for (int i = 0; i < randomIntBetween(0, 5); i++) {
String repository = randomValueOtherThanMany(r -> repositories.contains(r), () -> randomAlphaOfLength(10));
repositories.add(repository);
failedResponses.put(repository, new ElasticsearchException(randomAlphaOfLength(10)));
responses.add(GetSnapshotsResponse.Response.error(repository, new ElasticsearchException(randomAlphaOfLength(10))));
}

return new GetSnapshotsResponse(successfulResponses, failedResponses);
return new GetSnapshotsResponse(responses);
}

public void testSerialization() throws IOException {
Expand Down

0 comments on commit c00640f

Please sign in to comment.