Skip to content

Commit

Permalink
HBASE-27632 Refactor WAL.Reader implementation so we can better suppo…
Browse files Browse the repository at this point in the history
…rt WAL splitting and replication
  • Loading branch information
Apache9 committed Mar 8, 2023
1 parent 8bdabed commit 8eedc9f
Show file tree
Hide file tree
Showing 63 changed files with 2,566 additions and 2,096 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
Expand All @@ -54,8 +52,6 @@ public void setUpCluster() throws Exception {
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
Writer.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
Expand Down Expand Up @@ -135,7 +137,7 @@ public String toString() {
* HLogInputFormat.
*/
static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
private Reader reader = null;
private WALStreamReader reader = null;
// visible until we can remove the deprecated HLogInputFormat
Entry currentEntry = new Entry();
private long startTime;
Expand All @@ -144,6 +146,47 @@ static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K,
private Path logFile;
private long currentPos;

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private WALStreamReader openReader(Path path, long startPosition) throws IOException {
long retryInterval = 2000; // 2 sec
int maxAttempts = 30;
int attempt = 0;
Exception ee = null;
WALStreamReader reader = null;
while (reader == null && attempt++ < maxAttempts) {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
reader =
WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
return reader;
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + path, lnre);
AbstractFSWALProvider.recoverLease(conf, path);
reader = null;
ee = lnre;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
ee = npe;
}
if (reader == null) {
// sleep before next attempt
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
throw new IOException("Could not open reader", ee);
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Expand All @@ -158,8 +201,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)

private void openReader(Path path) throws IOException {
closeReader();
reader = AbstractFSWALProvider.openReader(path, conf);
seek();
reader = openReader(path, currentPos > 0 ? currentPos : -1);
setCurrentPath(path);
}

Expand All @@ -174,12 +216,6 @@ private void closeReader() throws IOException {
}
}

private void seek() throws IOException {
if (currentPos != 0) {
reader.seek(currentPos);
}
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo r
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
// we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we
// need to ignore EOFException.
conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down Expand Up @@ -88,7 +89,7 @@ protected void processOptions(CommandLine cmd) {
protected int doWork() throws Exception {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);
try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
try (WALStreamReader reader = WALFactory.createStreamReader(fs, path, conf)) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -268,6 +269,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String SPECIAL_RECOVERED_EDITS_DIR =
"hbase.hregion.special.recovered.edits.dir";

/**
* Mainly used for master local region, where we will replay the WAL file directly without
* splitting, so it is possible to have WAL files which are not closed cleanly, in this way,
* hitting EOF is expected so should not consider it as a critical problem.
*/
public static final String RECOVERED_EDITS_IGNORE_EOF =
"hbase.hregion.recovered.edits.ignore.eof";

/**
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
* master local region.
Expand Down Expand Up @@ -5533,9 +5542,7 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
MonitoredTask status = TaskMonitor.get().createStatus(msg);

status.setStatus("Opening recovered edits");
WAL.Reader reader = null;
try {
reader = WALFactory.createReader(fs, edits, conf);
try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
Expand Down Expand Up @@ -5689,12 +5696,17 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during "
+ "wal splitting, so we have this data in another edit. Continuing, but renaming " + edits
+ " as " + p + " for region " + this;
LOG.warn(msg, eof);
status.abort(msg);
if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during "
+ "wal splitting, so we have this data in another edit. Continuing, but renaming "
+ edits + " as " + p + " for region " + this;
LOG.warn(msg, eof);
status.abort(msg);
} else {
LOG.warn("EOF while replaying recover edits and config '{}' is true so "
+ "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
}
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
Expand All @@ -5721,9 +5733,6 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
return currentEditSeqId;
} finally {
status.cleanup();
if (reader != null) {
reader.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
* (smaller) than the most-recent flush.
* <p>
* To read an WAL, call
* {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
* To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
* call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
* replication where we may want to tail the active WAL file.
* <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
* is now a lame duck; any more appends or syncs will fail also with the same original exception. If
* we have made successful appends to the WAL and we then are unable to sync them, our current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -185,8 +187,7 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));
length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));

initAfterHeader(doCompress);

Expand Down Expand Up @@ -257,7 +258,7 @@ protected void writeWALTrailer() {
LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
+ " > " + this.trailerWarnSize);
}
length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC));
this.trailerWritten = true;
} catch (IOException ioe) {
LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
Expand Down
Loading

0 comments on commit 8eedc9f

Please sign in to comment.