Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27632 Refactor WAL.Reader implementation so we can better suppo… #5055

Merged
merged 2 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
Expand All @@ -54,8 +52,6 @@ public void setUpCluster() throws Exception {
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
Writer.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
Expand Down Expand Up @@ -135,7 +137,7 @@ public String toString() {
* HLogInputFormat.
*/
static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
private Reader reader = null;
private WALStreamReader reader = null;
// visible until we can remove the deprecated HLogInputFormat
Entry currentEntry = new Entry();
private long startTime;
Expand All @@ -144,6 +146,47 @@ static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K,
private Path logFile;
private long currentPos;

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private WALStreamReader openReader(Path path, long startPosition) throws IOException {
long retryInterval = 2000; // 2 sec
int maxAttempts = 30;
int attempt = 0;
Exception ee = null;
WALStreamReader reader = null;
while (reader == null && attempt++ < maxAttempts) {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
reader =
WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
return reader;
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + path, lnre);
AbstractFSWALProvider.recoverLease(conf, path);
reader = null;
ee = lnre;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
ee = npe;
}
if (reader == null) {
// sleep before next attempt
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
throw new IOException("Could not open reader", ee);
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Expand All @@ -158,8 +201,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)

private void openReader(Path path) throws IOException {
closeReader();
reader = AbstractFSWALProvider.openReader(path, conf);
seek();
reader = openReader(path, currentPos > 0 ? currentPos : -1);
setCurrentPath(path);
}

Expand All @@ -174,12 +216,6 @@ private void closeReader() throws IOException {
}
}

private void seek() throws IOException {
if (currentPos != 0) {
reader.seek(currentPos);
}
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo r
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
// we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we
// need to ignore EOFException.
conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -88,7 +89,7 @@ protected void processOptions(CommandLine cmd) {
protected int doWork() throws Exception {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);
try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
try (WALStreamReader reader = WALFactory.createStreamReader(fs, path, conf)) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -268,6 +269,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String SPECIAL_RECOVERED_EDITS_DIR =
"hbase.hregion.special.recovered.edits.dir";

/**
* Mainly used for master local region, where we will replay the WAL file directly without
* splitting, so it is possible to have WAL files which are not closed cleanly, in this way,
* hitting EOF is expected so should not consider it as a critical problem.
*/
public static final String RECOVERED_EDITS_IGNORE_EOF =
"hbase.hregion.recovered.edits.ignore.eof";

/**
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
* master local region.
Expand Down Expand Up @@ -5533,9 +5542,7 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
MonitoredTask status = TaskMonitor.get().createStatus(msg);

status.setStatus("Opening recovered edits");
WAL.Reader reader = null;
try {
reader = WALFactory.createReader(fs, edits, conf);
try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
Expand Down Expand Up @@ -5689,12 +5696,17 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we never ignored EOF before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old ProtobufLogReader implementation, usually we will not throw EOFException out, so in the past this is not a big problem. But anyway, we could open a issue to backport this change to branch-2.x.

msg = "EnLongAddered EOF. Most likely due to Master failure during "
+ "wal splitting, so we have this data in another edit. Continuing, but renaming " + edits
+ " as " + p + " for region " + this;
LOG.warn(msg, eof);
status.abort(msg);
if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during "
+ "wal splitting, so we have this data in another edit. Continuing, but renaming "
+ edits + " as " + p + " for region " + this;
LOG.warn(msg, eof);
status.abort(msg);
} else {
LOG.warn("EOF while replaying recover edits and config '{}' is true so "
+ "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
}
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
Expand All @@ -5721,9 +5733,6 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
return currentEditSeqId;
} finally {
status.cleanup();
if (reader != null) {
reader.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
* (smaller) than the most-recent flush.
* <p>
* To read an WAL, call
* {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
* To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
* call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
* replication where we may want to tail the active WAL file.
* <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
* is now a lame duck; any more appends or syncs will fail also with the same original exception. If
* we have made successful appends to the WAL and we then are unable to sync them, our current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -185,8 +187,7 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));
length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));

initAfterHeader(doCompress);

Expand Down Expand Up @@ -257,7 +258,7 @@ protected void writeWALTrailer() {
LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
+ " > " + this.trailerWarnSize);
}
length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC));
this.trailerWritten = true;
} catch (IOException ioe) {
LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
Expand Down
Loading