Skip to content

Commit

Permalink
HBASE-27621 Always use findEntry to fill the Dictionary when reading …
Browse files Browse the repository at this point in the history
…compressed WAL file
  • Loading branch information
Apache9 committed Feb 8, 2023
1 parent 1a9e465 commit a084db5
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
throws IOException {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = (byte) src.read();
byte status = StreamUtils.readByte(src);
if (status == Dictionary.NOT_IN_DICTIONARY) {
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
tagDict.findEntry(dest, offset, tagLen);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
byte[] entry = tagDict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down Expand Up @@ -145,7 +145,7 @@ public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length) thr
tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
src.get(dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
tagDict.findEntry(dest, offset, tagLen);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, src.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.util;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -206,6 +207,22 @@ public static Pair<Integer, Integer> readRawVarint32(ByteBuffer input, int offse
return new Pair<>(result, newOffset - offset);
}

/**
* Read a byte from the given stream using the read method, and throw EOFException if it returns
* -1, like the implementation in {@code DataInputStream}.
* <p/>
* This is useful because casting the return value of read method into byte directly will make us
* lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as
* casting int -1 to byte also returns -1.
*/
public static byte readByte(InputStream in) throws IOException {
int r = in.read();
if (r < 0) {
throw new EOFException();
}
return (byte) r;
}

public static short toShort(byte hi, byte lo) {
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
Preconditions.checkArgument(s >= 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException {
// if this isn't in the dictionary, we need to add to the dictionary.
byte[] arr = new byte[length];
in.readFully(arr);
if (dict != null) dict.addEntry(arr, 0, length);
if (dict != null) {
dict.findEntry(arr, 0, length);
}
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry
Expand Down Expand Up @@ -141,7 +143,7 @@ static int uncompressIntoArray(byte[] to, int offset, DataInput in, Dictionary d
// if this isn't in the dictionary, we need to add to the dictionary.
int length = WritableUtils.readVInt(in);
in.readFully(to, offset, length);
dict.addEntry(to, offset, length);
dict.findEntry(to, offset, length);
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ public byte[] uncompress(ByteString data, Enum dictIndex) {

private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
InputStream in = bs.newInput();
byte status = (byte) in.read();
byte status = StreamUtils.readByte(in);
if (status == Dictionary.NOT_IN_DICTIONARY) {
byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
int bytesRead = in.read(arr);
if (bytesRead != arr.length) {
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
}
if (dict != null) dict.addEntry(arr, 0, arr.length);
if (dict != null) {
dict.findEntry(arr, 0, arr.length);
}
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry.
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down Expand Up @@ -350,17 +352,17 @@ protected Cell parseCell() throws IOException {
}

private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
byte status = (byte) in.read();
byte status = StreamUtils.readByte(in);
if (status == Dictionary.NOT_IN_DICTIONARY) {
// status byte indicating that data to be read is not in dictionary.
// if this isn't in the dictionary, we need to add to the dictionary.
int length = StreamUtils.readRawVarint32(in);
IOUtils.readFully(in, to, offset, length);
dict.addEntry(to, offset, length);
dict.findEntry(to, offset, length);
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary entry.
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;

/**
* Enable compression and reset the WALEntryStream while reading in ReplicationSourceWALReader.
* <p/>
* This is used to confirm that we can work well when hitting EOFException in the middle when
* reading a WAL entry, when compression is enabled. See HBASE-27621 for more details.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestWALEntryStreamCompressionReset {

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();

private static TableName TABLE_NAME = TableName.valueOf("reset");

private static RegionInfo REGION_INFO = RegionInfoBuilder.newBuilder(TABLE_NAME).build();

private static byte[] FAMILY = Bytes.toBytes("family");

private static MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();

private static NavigableMap<byte[], Integer> SCOPE;

private static String GROUP_ID = "group";

private static FileSystem FS;

private static ReplicationSource SOURCE;

private static MetricsSource METRICS_SOURCE;

private static ReplicationSourceLogQueue LOG_QUEUE;

private static Path TEMPLATE_WAL_FILE;

private static int END_OFFSET_OF_WAL_ENTRIES;

private static Path WAL_FILE;

private static volatile long WAL_LENGTH;

private static ReplicationSourceWALReader READER;

// return the wal path, and also the end offset of last wal entry
private static Pair<Path, Long> generateWAL() throws Exception {
Path path = UTIL.getDataTestDir("wal");
ProtobufLogWriter writer = new ProtobufLogWriter();
writer.init(FS, path, UTIL.getConfiguration(), false, FS.getDefaultBlockSize(path), null);
for (int i = 0; i < Byte.MAX_VALUE; i++) {
WALEdit edit = new WALEdit();
edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
.setRow(Bytes.toBytes(i)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-" + i))
.setValue(Bytes.toBytes("v-" + i)).build());
writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit));
}

WALEdit edit2 = new WALEdit();
edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
.setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier"))
.setValue(Bytes.toBytes("vv")).build());
edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
.setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-1"))
.setValue(Bytes.toBytes("vvv")).build());
writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit2));
writer.sync(false);
long offset = writer.getSyncedLength();
writer.close();
return Pair.newPair(path, offset);
}

@BeforeClass
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
FS = UTIL.getTestFileSystem();
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
conf.setInt("replication.source.maxretriesmultiplier", 1);
FS.mkdirs(UTIL.getDataTestDir());
Pair<Path, Long> pair = generateWAL();
TEMPLATE_WAL_FILE = pair.getFirst();
END_OFFSET_OF_WAL_ENTRIES = pair.getSecond().intValue();
WAL_FILE = UTIL.getDataTestDir("rep_source");

METRICS_SOURCE = new MetricsSource("reset");
SOURCE = mock(ReplicationSource.class);
when(SOURCE.isPeerEnabled()).thenReturn(true);
when(SOURCE.getWALFileLengthProvider()).thenReturn(p -> OptionalLong.of(WAL_LENGTH));
when(SOURCE.getServerWALsBelongTo())
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE);
ReplicationSourceManager rsm = mock(ReplicationSourceManager.class);
when(rsm.getTotalBufferUsed()).thenReturn(new AtomicLong());
when(rsm.getTotalBufferLimit())
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
when(rsm.getGlobalMetrics()).thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
when(SOURCE.getSourceManager()).thenReturn(rsm);

LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE);
LOG_QUEUE.enqueueLog(WAL_FILE, GROUP_ID);
READER = new ReplicationSourceWALReader(FS, conf, LOG_QUEUE, 0, e -> e, SOURCE, GROUP_ID);
}

@AfterClass
public static void tearDown() throws Exception {
READER.setReaderRunning(false);
READER.join();
UTIL.cleanupTestDir();
}

private void test(byte[] content, FSDataOutputStream out) throws Exception {
// minus 15 so the second entry is incomplete
// 15 is a magic number here, we want the reader parse the first cell but not the second cell,
// especially not the qualifier of the second cell. The value of the second cell is 'vvv', which
// is 3 bytes, plus 8 bytes timestamp, and also qualifier, family and row(which should have been
// compressed), so 15 is a proper value, of course 14 or 16 could also work here.
out.write(content, 0, END_OFFSET_OF_WAL_ENTRIES - 15);
out.hflush();
WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES - 15;
READER.start();
List<WAL.Entry> entries = new ArrayList<>();
for (;;) {
WALEntryBatch batch = READER.poll(1000);
if (batch == null) {
break;
}
entries.addAll(batch.getWalEntries());
}
// should return all the entries except the last one
assertEquals(Byte.MAX_VALUE, entries.size());
for (int i = 0; i < Byte.MAX_VALUE; i++) {
WAL.Entry entry = entries.get(i);
assertEquals(1, entry.getEdit().size());
Cell cell = entry.getEdit().getCells().get(0);
assertEquals(i, Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
assertEquals(Bytes.toString(FAMILY),
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
assertEquals("qualifier-" + i, Bytes.toString(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals("v-" + i,
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}

// confirm that we can not get the last one since it is incomplete
assertNull(READER.poll(1000));
// write the last byte out
out.write(content, END_OFFSET_OF_WAL_ENTRIES - 15, 15);
out.hflush();
WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES;

// should get the last entry
WALEntryBatch batch = READER.poll(10000);
assertEquals(1, batch.getNbEntries());
WAL.Entry entry = batch.getWalEntries().get(0);
assertEquals(2, entry.getEdit().size());
Cell cell2 = entry.getEdit().getCells().get(0);
assertEquals(-1, Bytes.toInt(cell2.getRowArray(), cell2.getRowOffset()));
assertEquals(Bytes.toString(FAMILY),
Bytes.toString(cell2.getFamilyArray(), cell2.getFamilyOffset(), cell2.getFamilyLength()));
assertEquals("qualifier", Bytes.toString(cell2.getQualifierArray(), cell2.getQualifierOffset(),
cell2.getQualifierLength()));
assertEquals("vv",
Bytes.toString(cell2.getValueArray(), cell2.getValueOffset(), cell2.getValueLength()));

Cell cell3 = entry.getEdit().getCells().get(1);
assertEquals(-1, Bytes.toInt(cell3.getRowArray(), cell3.getRowOffset()));
assertEquals(Bytes.toString(FAMILY),
Bytes.toString(cell3.getFamilyArray(), cell3.getFamilyOffset(), cell3.getFamilyLength()));
assertEquals("qualifier-1", Bytes.toString(cell3.getQualifierArray(),
cell3.getQualifierOffset(), cell3.getQualifierLength()));
assertEquals("vvv",
Bytes.toString(cell3.getValueArray(), cell3.getValueOffset(), cell3.getValueLength()));
}

@Test
public void testReset() throws Exception {
byte[] content;
try (FSDataInputStream in = FS.open(TEMPLATE_WAL_FILE)) {
content = ByteStreams.toByteArray(in);
}
try (FSDataOutputStream out = FS.create(WAL_FILE)) {
test(content, out);
}
}
}

0 comments on commit a084db5

Please sign in to comment.