Skip to content

Commit

Permalink
HDFS-16689. Standby NameNode crashes when transitioning to Active wit…
Browse files Browse the repository at this point in the history
…h in-progress tailer
  • Loading branch information
zengqiang.xu committed Oct 6, 2022
1 parent a708ff9 commit 8cd18a1
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1657,15 +1657,14 @@ synchronized void logEdit(final int length, final byte[] data) {
/**
* Run recovery on all journals to recover any unclosed segments
*/
synchronized void recoverUnclosedStreams() {
synchronized void recoverUnclosedStreams() throws IOException {
Preconditions.checkState(
state == State.BETWEEN_LOG_SEGMENTS,
"May not recover segments - wrong state: %s", state);
try {
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
// TODO: are we sure this is OK?
throw new UnrecoverEditsException("Failed recover unclosed segments", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -174,6 +175,11 @@ protected FSImage(Configuration conf,
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
FSImageFormatProtobuf.initParallelLoad(conf);
}

@VisibleForTesting
void setEditLog(FSEditLog editLog) {
this.editLog = editLog;
}

void format(FSNamesystem fsn, String clusterId, boolean force)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;

import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME;
Expand Down Expand Up @@ -1286,6 +1287,8 @@ void loadFSImage(StartupOption startOpt) throws IOException {
fsImage.openEditLogForWrite(getEffectiveLayoutVersion());
}
success = true;
} catch (UnrecoverEditsException e) {
terminate(1, e.getLocalizedMessage());
} finally {
if (!success) {
fsImage.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import java.io.IOException;

/**
* This exception is thrown when failed recover unclosed segments.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class UnrecoverEditsException extends IOException {
/** for java.io.Serializable. */
private static final long serialVersionUID = 1L;

public UnrecoverEditsException(String msg, IOException e) {
super(msg, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void stop() throws IOException {
}

@VisibleForTesting
FSEditLog getEditLog() {
public FSEditLog getEditLog() {
return editLog;
}

Expand Down Expand Up @@ -311,7 +311,8 @@ public Void run() throws Exception {
startTime - lastLoadTimeMs);
// It is already under the name system lock and the checkpointer
// thread is already stopped. No need to acquire any other lock.
editsTailed = doTailEdits();
// HDFS-16689. Disable inProgress to use the streaming mechanism
editsTailed = doTailEdits(false);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
Expand All @@ -323,9 +324,13 @@ public Void run() throws Exception {
}
});
}

@VisibleForTesting
public long doTailEdits() throws IOException, InterruptedException {
return doTailEdits(inProgressOk);
}

private long doTailEdits(boolean enableInProgress) throws IOException, InterruptedException {
Collection<EditLogInputStream> streams;
FSImage image = namesystem.getFSImage();

Expand All @@ -334,7 +339,7 @@ public long doTailEdits() throws IOException, InterruptedException {
long startTime = timer.monotonicNow();
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, true);
null, enableInProgress, true);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ public void testMismatchedNNIsRejected() throws Exception {
.build();
fail("New NN with different namespace should have been rejected");
} catch (ExitException ee) {
GenericTestUtils.assertExceptionContains(
"Unable to start log segment 1: too few journals", ee);
GenericTestUtils.assertExceptionContains("Failed recover unclosed segments", ee);
assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

/**
* One Util class to mock QJM for some UTs not in this package.
*/
public final class SpyQJournalUtil {

private SpyQJournalUtil() {
}

/**
* Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
* @param conf input configuration.
* @param uri input uri.
* @param nsInfo input nameservice info.
* @param nameServiceId input nameservice Id.
* @return one mocked QuorumJournalManager.
* @throws IOException throw IOException.
*/
public static QuorumJournalManager createSpyingQJM(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return new DirectExecutorService();
}
};
return Mockito.spy(logger);
}
};
return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory);
}

/**
* Mock Journals with different response for getJournaledEdits rpc with the input startTxid.
* 1. First journal with one empty response.
* 2. Second journal with one normal response.
* 3. Third journal with one slow response.
* @param manager input QuorumJournalManager.
* @param startTxid input start txid.
*/
public static void mockJNWithEmptyOrSlowResponse(QuorumJournalManager manager, long startTxid) {
List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
Semaphore semaphore = new Semaphore(0);

// Mock JN0 return an empty response.
Mockito.doAnswer(invocation -> {
semaphore.release();
return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
}).when(spies.get(0))
.getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);

// Mock JN1 return a normal response.
spyGetJournaledEdits(spies, 1, startTxid, () -> semaphore.release(1));

// Mock JN2 return a slow response
spyGetJournaledEdits(spies, 2, startTxid, () -> semaphore.acquireUninterruptibly(2));
}

public static void spyGetJournaledEdits(List<AsyncLogger> spies,
int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
import static org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil.spyGetJournaledEdits;
import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -34,12 +35,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -59,7 +58,6 @@
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
Expand All @@ -68,7 +66,6 @@
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -1135,9 +1132,9 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
writeTxns(stm, 21, 20);

Semaphore semaphore = new Semaphore(0);
spyGetJournaledEdits(0, 21, () -> semaphore.release(1));
spyGetJournaledEdits(1, 21, () -> semaphore.release(1));
spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2));
spyGetJournaledEdits(spies, 0, 21, () -> semaphore.release(1));
spyGetJournaledEdits(spies, 1, 21, () -> semaphore.release(1));
spyGetJournaledEdits(spies, 2, 21, () -> semaphore.acquireUninterruptibly(2));

List<EditLogInputStream> streams = new ArrayList<>();
qjm.selectInputStreams(streams, 21, true, true);
Expand All @@ -1147,17 +1144,6 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
assertEquals(40, streams.get(0).getLastTxId());
}

private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}

@Test
public void testSelectViaRpcAfterJNRestart() throws Exception {
EditLogOutputStream stm =
Expand Down Expand Up @@ -1210,27 +1196,10 @@ public void testGetJournalAddressListWithResolution() throws Exception {
// expected
}
}

private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return new DirectExecutorService();
}
};

return Mockito.spy(logger);
}
};
return closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory));

private QuorumJournalManager createSpyingQJM() throws IOException {
return closeLater(SpyQJournalUtil.createSpyingQJM(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, null));
}

private static void waitForAllPendingCalls(AsyncLoggerSet als)
Expand Down
Loading

0 comments on commit 8cd18a1

Please sign in to comment.