Skip to content

Commit

Permalink
HBASE-27644 Should not return false when WALKey has no following KVs …
Browse files Browse the repository at this point in the history
…while reading WAL file (#5032)

Signed-off-by: Viraj Jasani <[email protected]>
(cherry picked from commit 4a9cf99)
  • Loading branch information
Apache9 committed Feb 26, 2023
1 parent a6ca9cb commit 82c4cce
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,9 @@ protected boolean readNext(Entry entry) throws IOException {
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
seekOnFs(originalPosition);
return false;
return true;
}
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public Integer run() throws Exception {
* log entry. Does its writing as an alternate user in another filesystem instance to simulate
* better it being a regionserver.
*/
class ZombieLastLogWriterRegionServer extends Thread {
private class ZombieLastLogWriterRegionServer extends Thread {
final AtomicLong editsCount;
final AtomicBoolean stop;
final int numOfWriters;
Expand Down Expand Up @@ -402,10 +402,6 @@ public void testOldRecoveredEditsFileSidelined() throws IOException {

private Path createRecoveredEditsPathForRegion() throws IOException {
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
long now = EnvironmentEdgeManager.currentTime();
Entry entry = new Entry(
new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
return p;
Expand Down Expand Up @@ -491,10 +487,10 @@ public void testSplitLeavesCompactionEventsEdits() throws IOException {
assertEquals(11, countWAL(splitLog[0]));
}

/*
/**
* Tests that WalSplitter ignores replication marker edits.
*/
@Test(timeout = 30000)
@Test
public void testSplitRemovesReplicationMarkerEdits() throws IOException {
RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
Expand Down Expand Up @@ -1206,7 +1202,44 @@ public void testRecoveredEditsStoragePolicy() throws IOException {
} finally {
conf.unset(HConstants.WAL_STORAGE_POLICY);
}
}

/**
* See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
* it, instead of losing data after it.
*/
@Test
public void testEmptyWALEdit() throws IOException {
final String region = "region__5";
REGIONS.clear();
REGIONS.add(region);
makeRegionDirs(REGIONS);
fs.mkdirs(WALDIR);
Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
generateEmptyEditWAL(path, Bytes.toBytes(region));
useDifferentDFSClient();

Path regiondir = new Path(TABLEDIR, region);
fs.mkdirs(regiondir);
List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
// Make sure that WALSplitter generate the split file
assertEquals(1, splitPaths.size());

Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
assertEquals(11, countWAL(originalLog));
// we will skip the empty WAL when splitting
assertEquals(10, countWAL(splitPaths.get(0)));
}

private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
fs.mkdirs(WALDIR);
try (Writer writer = wals.createWALWriter(fs, path)) {
long seq = 0;
appendEmptyEntry(writer, TABLE_NAME, region, seq++);
for (int i = 0; i < 10; i++) {
appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
}
}
}

private Writer generateWALs(int leaveOpen) throws IOException {
Expand Down Expand Up @@ -1399,7 +1432,7 @@ private static void appendRegionEvent(Writer w, String region) throws IOExceptio
w.sync(false);
}

public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
LOG.info(Thread.currentThread().getName() + " append");
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
Expand All @@ -1412,13 +1445,27 @@ private static Entry createTestEntry(TableName table, byte[] region, byte[] row,
byte[] qualifier, byte[] value, long seq) {
long time = System.nanoTime();

seq++;
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
WALEdit edit = new WALEdit();
edit.add(cell);
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
}

private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
throws IOException {
LOG.info(Thread.currentThread().getName() + " append");
writer.append(createEmptyEntry(table, region, seq));
LOG.info(Thread.currentThread().getName() + " sync");
writer.sync(false);
return seq;
}

private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
long time = System.nanoTime();
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
}

private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
Writer writer =
WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
Expand All @@ -1428,22 +1475,19 @@ private void injectEmptyFile(String suffix, boolean closeFile) throws IOExceptio
}

private boolean logsAreEqual(Path p1, Path p2) throws IOException {
Reader in1, in2;
in1 = wals.createReader(fs, p1);
in2 = wals.createReader(fs, p2);
Entry entry1;
Entry entry2;
while ((entry1 = in1.next()) != null) {
entry2 = in2.next();
if (
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
) {
return false;
try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) {
Entry entry1;
Entry entry2;
while ((entry1 = in1.next()) != null) {
entry2 = in2.next();
if (
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
) {
return false;
}
}
}
in1.close();
in2.close();
return true;
}
}

0 comments on commit 82c4cce

Please sign in to comment.