Skip to content

Commit

Permalink
HDFS-16689. Standby NameNode crashes when transitioning to Active wit…
Browse files Browse the repository at this point in the history
…h in-progress tailer (#4744)

Signed-off-by: Erik Krogen <[email protected]>
Co-authored-by: zengqiang.xu <[email protected]>
  • Loading branch information
ZanderXu and zengqiang.xu authored Dec 21, 2022
1 parent b63b777 commit 15b52fb
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1654,18 +1654,31 @@ synchronized void logEdit(final int length, final byte[] data) {
endTransaction(start);
}

void recoverUnclosedStreams() throws IOException {
recoverUnclosedStreams(false);
}

/**
* Run recovery on all journals to recover any unclosed segments
*/
synchronized void recoverUnclosedStreams() {
synchronized void recoverUnclosedStreams(boolean terminateOnFailure) throws IOException {
Preconditions.checkState(
state == State.BETWEEN_LOG_SEGMENTS,
"May not recover segments - wrong state: %s", state);
try {
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {
// All journals have failed, it is handled in logSync.
// TODO: are we sure this is OK?
if (terminateOnFailure) {
final String msg = "Unable to recover log segments: "
+ "too few journals successfully recovered.";
LOG.error(msg, ex);
synchronized (journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
} else {
throw ex;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ void startActiveServices() throws IOException {
// During startup, we're already open for write during initialization.
editLog.initJournalsForWrite();
// May need to recover
editLog.recoverUnclosedStreams();
editLog.recoverUnclosedStreams(true);

LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ public Void run() throws Exception {
startTime - lastLoadTimeMs);
// It is already under the name system lock and the checkpointer
// thread is already stopped. No need to acquire any other lock.
editsTailed = doTailEdits();
// HDFS-16689. Disable inProgress to use the streaming mechanism
editsTailed = doTailEdits(false);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
Expand All @@ -323,9 +324,13 @@ public Void run() throws Exception {
}
});
}

@VisibleForTesting
public long doTailEdits() throws IOException, InterruptedException {
return doTailEdits(inProgressOk);
}

private long doTailEdits(boolean enableInProgress) throws IOException, InterruptedException {
Collection<EditLogInputStream> streams;
FSImage image = namesystem.getFSImage();

Expand All @@ -334,7 +339,7 @@ public long doTailEdits() throws IOException, InterruptedException {
long startTime = timer.monotonicNow();
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, true);
null, enableInProgress, true);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -197,10 +196,9 @@ public void testMismatchedNNIsRejected() throws Exception {
.manageNameDfsDirs(false).format(false).checkExitOnShutdown(false)
.build();
fail("New NN with different namespace should have been rejected");
} catch (ExitException ee) {
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Unable to start log segment 1: too few journals", ee);
assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled());
"recoverUnfinalizedSegments failed for too many journals", ioe);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

/**
* One Util class to mock QJM for some UTs not in this package.
*/
public final class SpyQJournalUtil {

private SpyQJournalUtil() {
}

/**
* Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
* @param conf input configuration.
* @param uri input uri.
* @param nsInfo input nameservice info.
* @param nameServiceId input nameservice Id.
* @return one mocked QuorumJournalManager.
* @throws IOException throw IOException.
*/
public static QuorumJournalManager createSpyingQJM(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return new DirectExecutorService();
}
};
return Mockito.spy(logger);
}
};
return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory);
}

/**
* Mock Journals with different response for getJournaledEdits rpc with the input startTxid.
* 1. First journal with one empty response.
* 2. Second journal with one normal response.
* 3. Third journal with one slow response.
* @param manager input QuorumJournalManager.
* @param startTxid input start txid.
*/
public static void mockJNWithEmptyOrSlowResponse(QuorumJournalManager manager, long startTxid) {
List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
Semaphore semaphore = new Semaphore(0);

// Mock JN0 return an empty response.
Mockito.doAnswer(invocation -> {
semaphore.release();
return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
}).when(spies.get(0))
.getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);

// Mock JN1 return a normal response.
spyGetJournaledEdits(spies, 1, startTxid, () -> semaphore.release(1));

// Mock JN2 return a slow response
spyGetJournaledEdits(spies, 2, startTxid, () -> semaphore.acquireUninterruptibly(2));
}

public static void spyGetJournaledEdits(List<AsyncLogger> spies,
int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
import static org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil.spyGetJournaledEdits;
import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -34,12 +35,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -59,7 +58,6 @@
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
Expand All @@ -68,7 +66,6 @@
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -1135,9 +1132,9 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
writeTxns(stm, 21, 20);

Semaphore semaphore = new Semaphore(0);
spyGetJournaledEdits(0, 21, () -> semaphore.release(1));
spyGetJournaledEdits(1, 21, () -> semaphore.release(1));
spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2));
spyGetJournaledEdits(spies, 0, 21, () -> semaphore.release(1));
spyGetJournaledEdits(spies, 1, 21, () -> semaphore.release(1));
spyGetJournaledEdits(spies, 2, 21, () -> semaphore.acquireUninterruptibly(2));

List<EditLogInputStream> streams = new ArrayList<>();
qjm.selectInputStreams(streams, 21, true, true);
Expand All @@ -1147,17 +1144,6 @@ public void testSelectViaRPCAfterJNJitter() throws Exception {
assertEquals(40, streams.get(0).getLastTxId());
}

private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) {
Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
preHook.run();
@SuppressWarnings("unchecked")
ListenableFuture<GetJournaledEditsResponseProto> result =
(ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
return result;
}).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
}

@Test
public void testSelectViaRpcAfterJNRestart() throws Exception {
EditLogOutputStream stm =
Expand Down Expand Up @@ -1210,27 +1196,10 @@ public void testGetJournalAddressListWithResolution() throws Exception {
// expected
}
}

private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return new DirectExecutorService();
}
};

return Mockito.spy(logger);
}
};
return closeLater(new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory));

private QuorumJournalManager createSpyingQJM() throws IOException {
return closeLater(SpyQJournalUtil.createSpyingQJM(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, null));
}

private static void waitForAllPendingCalls(AsyncLoggerSet als)
Expand Down
Loading

0 comments on commit 15b52fb

Please sign in to comment.