Skip to content

Commit

Permalink
SOLR-12708: Aggregate failures from downstream async jobs; add error …
Browse files Browse the repository at this point in the history
…handling for RestoreCmd
  • Loading branch information
manokovacs committed Sep 26, 2018
1 parent e16d7d6 commit 16558ee
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 19 deletions.
4 changes: 3 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Other Changes
java.time.DateTimeFormatter instead of Joda time (see upgrade notes). "Lenient" is enabled. Removed Joda Time dependency.
(David Smiley, Bar Rotstein)

* SOLR-12708: Fix async collection admin operations to report downstream failures (Mano Kovacs)

================== 7.6.0 ==================

Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
Expand Down Expand Up @@ -462,7 +464,7 @@ Other Changes

* SOLR-8742: In HdfsDirectoryTest replace RAMDirectory usages with ByteBuffersDirectory.
(hossman, Mark Miller, Andrzej Bialecki, Steve Rowe)

* SOLR-12771: Improve Autoscaling Policy and Preferences documentation. (hossman, Steve Rowe)

================== 7.4.0 ==================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
COLOCATED_WITH, null));

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String FAILURE_FIELD = "failure";
public static final String SUCCESS_FIELD = "success";

Overseer overseer;
ShardHandlerFactory shardHandlerFactory;
Expand Down Expand Up @@ -837,6 +839,15 @@ private void processResponse(NamedList results, ShardResponse srsp, Set<String>
processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
}

private SimpleOrderedMap getOrCreateMap(NamedList results, String key) {
SimpleOrderedMap map = (SimpleOrderedMap) results.get(key);
if (map == null) {
map = new SimpleOrderedMap();
results.add(key, map);
}
return map;
}

@SuppressWarnings("unchecked")
private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
String rootThrowable = null;
Expand All @@ -847,21 +858,13 @@ private void processResponse(NamedList results, Throwable e, String nodeName, So
if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
log.error("Error from shard: " + shard, e);

SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
SimpleOrderedMap failure = getOrCreateMap(results, FAILURE_FIELD);

failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());

} else {

SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
SimpleOrderedMap success = getOrCreateMap(results, SUCCESS_FIELD);

success.add(nodeName, solrResponse.getResponse());
}
Expand All @@ -871,7 +874,15 @@ private void processResponse(NamedList results, Throwable e, String nodeName, So
private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
for (String k:requestMap.keySet()) {
log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
NamedList response = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k));
results.add(requestMap.get(k), response); // backward compatibility reasons
String msg = (String)response.get("Response");
if ("failed".equalsIgnoreCase(((String)response.get("STATUS")))) {
log.error("Error from shard " + k + ": " + msg);
getOrCreateMap(results, FAILURE_FIELD).add(k, msg);
} else {
getOrCreateMap(results, SUCCESS_FIELD).add(k, msg);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
Expand All @@ -51,6 +54,7 @@
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
Expand Down Expand Up @@ -238,9 +242,12 @@ public void call(ClusterState state, ZkNodeProps message, NamedList results) thr
message, sliceNames,
numNrtReplicas, numTlogReplicas, numPullReplicas);
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);

CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());

//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
log.info("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
propMap.put(COLLECTION_PROP, restoreCollectionName);
Expand Down Expand Up @@ -271,7 +278,37 @@ public void call(ClusterState state, ZkNodeProps message, NamedList results) thr
propMap.put(ASYNC, asyncId);
}
ocmh.addPropertyParams(message, propMap);
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
final NamedList addResult = new NamedList();
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), addResult, () -> {
countDownLatch.countDown();
Object addResultFailure = addResult.get("failure");
if (addResultFailure != null) {
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.addAll((NamedList) addResultFailure);
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.addAll((NamedList) addResult.get("success"));
}
});
}

boolean allIsDone = countDownLatch.await(1, TimeUnit.HOURS);
if (!allIsDone) {
throw new TimeoutException("Initial replicas were not created within 10 minutes. Timing out.");
}
Object failures = results.get("failure");
if (failures != null && ((SimpleOrderedMap) failures).size() > 0) {
log.error("Restore failed to create initial replicas.");
ocmh.cleanupCollection(restoreCollectionName, new NamedList());
return;
}

//refresh the location copy of collection state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
removeTask("running", taskObject.taskId);
if (exceptionCaught) {
addTask("failed", taskObject, true);
} else
} else {
addTask("completed", taskObject, true);
}
}
});
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<fieldType name="string" class="solr.StrField"/>
<fieldType name="int" class="${solr.tests.IntegerFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
<fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
<dynamicField name="*" type="string" indexed="true" stored="true"/>
<!-- for versioning -->
<field name="_version_" type="long" indexed="true" stored="true"/>
<field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
<field name="id" type="string" indexed="true" stored="true"/>
<dynamicField name="*_s" type="string" indexed="true" stored="true" />
<uniqueKey>id</uniqueKey>
</schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" ?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!-- Minimal solrconfig.xml with /select, /admin and /update only -->

<config>

<dataDir>${solr.data.dir:}</dataDir>

<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>

<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateHandler class="solr.DirectUpdateHandler2">
<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>
<updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
</updateHandler>

<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="indent">true</str>
<str name="df">text</str>
</lst>

</requestHandler>

<requestHandler name="/nope" class="solr.NonExistinghHandler">
</requestHandler>
</config>

Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa

protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
protected static final int NUM_SPLIT_SHARDS = 3; //We always split shard1 so total shards post split will be 3
protected static final String BACKUPNAME_PREFIX = "mytestbackup";

int replFactor;
int numTlogReplicas;
int numPullReplicas;

private static long docsSeed; // see indexDocs()
private String testSuffix = "test1";

@BeforeClass
public static void createCluster() throws Exception {
Expand All @@ -72,7 +74,7 @@ public static void createCluster() throws Exception {
/**
* @return The name of the collection to use.
*/
public abstract String getCollectionName();
public abstract String getCollectionNamePrefix();

/**
* @return The name of the backup repository to use.
Expand All @@ -85,8 +87,18 @@ public static void createCluster() throws Exception {
*/
public abstract String getBackupLocation();


public String getCollectionName(){
return getCollectionNamePrefix() + "_" + testSuffix;
}

public void setTestSuffix(String testSuffix) {
this.testSuffix = testSuffix;
}

@Test
public void test() throws Exception {
setTestSuffix("testok");
boolean isImplicit = random().nextBoolean();
boolean doSplitShardOperation = !isImplicit && random().nextBoolean();
replFactor = TestUtil.nextInt(random(), 1, 2);
Expand Down Expand Up @@ -146,6 +158,56 @@ public void test() throws Exception {
testInvalidPath(getCollectionName());
}

@Test
public void testRestoreFailure() throws Exception {
setTestSuffix("testfailure");
replFactor = TestUtil.nextInt(random(), 1, 2);
numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
numPullReplicas = TestUtil.nextInt(random(), 0, 1);

CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas);

if (NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) > cluster.getJettySolrRunners().size()) {
create.setMaxShardsPerNode((int)Math.ceil(NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) / cluster.getJettySolrRunners().size())); //just to assert it survives the restoration
}

CloudSolrClient solrClient = cluster.getSolrClient();
create.process(solrClient);

indexDocs(getCollectionName(), false);


String backupLocation = getBackupLocation();
String backupName = BACKUPNAME_PREFIX + testSuffix;

DocCollection backupCollection = solrClient.getZkStateReader().getClusterState().getCollection(getCollectionName());

log.info("Triggering Backup command");

{
CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
.setLocation(backupLocation).setRepositoryName(getBackupRepoName());
assertEquals(0, backup.process(solrClient).getStatus());
}

log.info("Triggering Restore command");

String restoreCollectionName = getCollectionName() + "_restored";

{
CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
.setLocation(backupLocation).setRepositoryName(getBackupRepoName());
if (backupCollection.getReplicas().size() > cluster.getJettySolrRunners().size()) {
// may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
restore.setMaxShardsPerNode((int)Math.ceil(backupCollection.getReplicas().size()/cluster.getJettySolrRunners().size()));
}

restore.setConfigName("confFaulty");
assertEquals(RequestStatusState.FAILED, restore.processAndWait(solrClient, 30));
}
}

/**
* This test validates the backup of collection configuration using
* {@linkplain CollectionAdminParams#NO_INDEX_BACKUP_STRATEGY}.
Expand Down Expand Up @@ -226,7 +288,7 @@ private int indexDocs(String collectionName, boolean useUUID) throws Exception {

private void testBackupAndRestore(String collectionName, int backupReplFactor) throws Exception {
String backupLocation = getBackupLocation();
String backupName = "mytestbackup";
String backupName = BACKUPNAME_PREFIX + testSuffix;

CloudSolrClient client = cluster.getSolrClient();
DocCollection backupCollection = client.getZkStateReader().getClusterState().getCollection(collectionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static void teardownClass() throws Exception {
}

@Override
public String getCollectionName() {
public String getCollectionNamePrefix() {
return "hdfsbackuprestore";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTes
public static void setupClass() throws Exception {
configureCluster(NUM_SHARDS)// nodes
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.addConfig("confFaulty", TEST_PATH().resolve("configsets").resolve("cloud-minimal-faulty").resolve("conf"))
.configure();

boolean whitespacesInPath = random().nextBoolean();
Expand All @@ -43,7 +44,7 @@ public static void setupClass() throws Exception {
}

@Override
public String getCollectionName() {
public String getCollectionNamePrefix() {
return "backuprestore";
}

Expand Down

0 comments on commit 16558ee

Please sign in to comment.