Skip to content

Commit

Permalink
HBASE-27621 Also clear the Dictionary when resetting when reading com…
Browse files Browse the repository at this point in the history
…pressed WAL file (#5016)

Signed-off-by: Xiaolin Ha <[email protected]>
(cherry picked from commit 833b10e)
  • Loading branch information
Apache9 committed Feb 11, 2023
1 parent af6b63a commit 8df3212
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
throws IOException {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = (byte) src.read();
byte status = StreamUtils.readByte(src);
if (status == Dictionary.NOT_IN_DICTIONARY) {
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
byte[] entry = tagDict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.util;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -206,6 +207,22 @@ public static Pair<Integer, Integer> readRawVarint32(ByteBuffer input, int offse
return new Pair<>(result, newOffset - offset);
}

/**
* Read a byte from the given stream using the read method, and throw EOFException if it returns
* -1, like the implementation in {@code DataInputStream}.
* <p/>
* This is useful because casting the return value of read method into byte directly will make us
* lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as
* casting int -1 to byte also returns -1.
*/
public static byte readByte(InputStream in) throws IOException {
int r = in.read();
if (r < 0) {
throw new EOFException();
}
return (byte) r;
}

public static short toShort(byte hi, byte lo) {
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
Preconditions.checkArgument(s >= 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException {
// if this isn't in the dictionary, we need to add to the dictionary.
byte[] arr = new byte[length];
in.readFully(arr);
if (dict != null) dict.addEntry(arr, 0, length);
if (dict != null) {
dict.addEntry(arr, 0, length);
}
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class ProtobufLogReader extends ReaderBase {
// cell codec classname
private String codecClsName = null;

// a flag indicate that whether we need to reset compression context when seeking back
private boolean resetCompression;

@InterfaceAudience.Private
public long trailerSize() {
if (trailerPresent) {
Expand Down Expand Up @@ -157,6 +160,9 @@ public long getPosition() throws IOException {
@Override
public void reset() throws IOException {
String clsName = initInternal(null, false);
if (resetCompression) {
resetCompression();
}
initAfterCompression(clsName); // We need a new decoder (at least).
}

Expand Down Expand Up @@ -339,6 +345,8 @@ protected boolean readNext(Entry entry) throws IOException {
WALKey.Builder builder = WALKey.newBuilder();
long size = 0;
boolean resetPosition = false;
// by default, we should reset the compression when seeking back after reading something
resetCompression = true;
try {
long available = -1;
try {
Expand All @@ -350,6 +358,14 @@ protected boolean readNext(Entry entry) throws IOException {
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
// if we quit here, we have just read the length, no actual data yet, which means we
// haven't put anything into the compression dictionary yet, so when seeking back to the
// last good position, we do not need to reset compression context.
// This is very useful for saving the extra effort for reconstructing the compression
// dictionary, where we need to read from the beginning instead of just seek to the
// position, as DFSInputStream implement the available method, so in most cases we will
// reach here if there are not enough data.
resetCompression = false;
throw new EOFException("Available stream not enough for edit, "
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
+ size + " at offset = " + this.inputStream.getPos());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
* Compression context to use reading. Can be null if no compression.
*/
protected CompressionContext compressionContext = null;
protected boolean emptyCompressionContext = true;
private boolean emptyCompressionContext = true;

/**
* Default constructor.
Expand Down Expand Up @@ -121,6 +121,17 @@ public void seek(long pos) throws IOException {
seekOnFs(pos);
}

/**
* Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
* to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
*/
protected final void resetCompression() {
if (compressionContext != null) {
compressionContext.clear();
emptyCompressionContext = true;
}
}

/**
* Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
* the stream if not null and may use it. Called once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ public byte[] uncompress(ByteString data, Enum dictIndex) {

private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
InputStream in = bs.newInput();
byte status = (byte) in.read();
byte status = StreamUtils.readByte(in);
if (status == Dictionary.NOT_IN_DICTIONARY) {
byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
int bytesRead = in.read(arr);
if (bytesRead != arr.length) {
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
}
if (dict != null) dict.addEntry(arr, 0, arr.length);
if (dict != null) {
dict.addEntry(arr, 0, arr.length);
}
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry.
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down Expand Up @@ -322,7 +324,7 @@ protected Cell parseCell() throws IOException {
}

private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
byte status = (byte) in.read();
byte status = StreamUtils.readByte(in);
if (status == Dictionary.NOT_IN_DICTIONARY) {
// status byte indicating that data to be read is not in dictionary.
// if this isn't in the dictionary, we need to add to the dictionary.
Expand All @@ -332,7 +334,7 @@ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOExcep
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary entry.
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down
Loading

0 comments on commit 8df3212

Please sign in to comment.