Skip to content

Commit

Permalink
Optimize LZ4 input and output stream
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Dec 21, 2021
1 parent 1f6666a commit 262b3eb
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import com.clickhouse.client.config.ClickHouseClientOption;

/**
* Extended input stream for read optimization.
*/
Expand Down Expand Up @@ -184,11 +186,17 @@ public long skip(long n) throws IOException {

static final class WrappedInputStream extends ClickHouseInputStream {
private final InputStream in;
private final byte[] buffer;

private int position;
private int limit;
private boolean closed;

WrappedInputStream(InputStream input) {
WrappedInputStream(InputStream input, int bufferSize) {
in = ClickHouseChecker.nonNull(input, "InputStream");
buffer = new byte[bufferSize];
position = 0;
limit = 0;
closed = false;
}

Expand All @@ -198,26 +206,34 @@ private void ensureOpen() throws IOException {
}
}

private int updateBuffer() throws IOException {
if (closed) {
return -1;
}

position = 0;
int count = in.read(buffer);
limit = count > 0 ? count : 0;
return count;
}

@Override
public int available() throws IOException {
return !closed ? in.available() : 0;
return !closed && (position < limit || updateBuffer() > 0) ? limit - position : 0;
}

@Override
public byte readByte() throws IOException {
ensureOpen();

int v = in.read();
if (v != -1) {
return (byte) v;
if (position >= limit && updateBuffer() < 0) {
try {
close();
} catch (IOException e) {
// ignore
}
throw new EOFException();
}

try {
close();
} catch (IOException e) {
// ignore
}
throw new EOFException();
return buffer[position++];
}

@Override
Expand All @@ -232,26 +248,125 @@ public void close() throws IOException {
in.close();
} finally {
closed = true;
position = 0;
limit = 0;
}
}
}

@Override
public int read() throws IOException {
ensureOpen();
return in.read();

int value = -1;
if (position < limit || updateBuffer() > 0) {
value = 0xFF & buffer[position++];
}
return value;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (position >= limit && updateBuffer() < 0) {
return -1;
}

ensureOpen();
return in.read(b, off, len);

int counter = 0;
while (counter < len) {
int size = Math.min(limit - position, len - counter);
System.arraycopy(buffer, position, b, off, size);
position += size;
off += size;
counter += size;

if (position >= limit && updateBuffer() < 0) {
break;
}
}

return counter;
}

@Override
public byte[] readBytes(int length) throws IOException {
if (length <= 0) {
return EMPTY_BYTES;
}

ensureOpen();

byte[] bytes = new byte[length];
int offset = 0;
int counter = 0;
while (counter < length) {
if (position >= limit && updateBuffer() < 0) {
try {
close();
} catch (IOException e) {
// ignore
}
throw counter == 0 ? new EOFException()
: new IOException(ClickHouseUtils
.format("Reached end of input stream after reading %d of %d bytes", counter,
bytes.length));
}

int size = Math.min(limit - position, length - counter);
System.arraycopy(buffer, position, bytes, offset, size);
position += size;
offset += size;
counter += size;
}

return bytes;
}

@Override
public String readString(int byteLength, Charset charset) throws IOException {
ensureOpen();

if (byteLength < 1) {
return "";
}

if (charset == null) {
charset = StandardCharsets.UTF_8;
}

if (limit - position > byteLength) {
int offset = position;
position += byteLength;
return new String(buffer, offset, byteLength, charset);
}

return new String(readBytes(byteLength), charset);
}

@Override
public long skip(long n) throws IOException {
ensureOpen();
return in.skip(n);

long counter = 0L;
while (n > 0L) {
if (position >= limit && updateBuffer() < 0) {
break;
} else {
int remain = limit - position;
if (n > remain) {
n -= remain;
counter += remain;
position = limit;
} else {
counter += n;
position += n;
n = 0L;
}
}
}

return counter;
}
}

Expand All @@ -274,7 +389,21 @@ public static ClickHouseInputStream of(BlockingQueue<ByteBuffer> queue, int time
* {@link ClickHouseInputStream}
*/
public static ClickHouseInputStream of(InputStream input) {
return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input : new WrappedInputStream(input);
return of(input, (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue());
}

/**
* Wraps the given input stream.
*
* @param input non-null input stream
* @param bufferSize buffer size which is always greater than zero(usually 4096
* or larger)
* @return wrapped input, or the same input if it's instance of
* {@link ClickHouseInputStream}
*/
public static ClickHouseInputStream of(InputStream input, int bufferSize) {
return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input
: new WrappedInputStream(input, bufferSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,33 @@ private static <T extends Enum<T>> T toEnum(int value, Class<T> enumType) {
}

public static int toInt32(byte[] bytes, int offset) {
return (0xFF & bytes[offset]) | ((0xFF & bytes[offset + 1]) << 8) | ((0xFF & bytes[offset + 2]) << 16)
| ((0xFF & bytes[offset + 3]) << 24);
return (0xFF & bytes[offset++]) | ((0xFF & bytes[offset++]) << 8) | ((0xFF & bytes[offset++]) << 16)
| ((0xFF & bytes[offset]) << 24);
}

public static long toInt64(byte[] bytes, int offset) {
return (0xFFL & bytes[offset]) | ((0xFFL & bytes[offset + 1]) << 8) | ((0xFFL & bytes[offset + 2]) << 16)
| ((0xFFL & bytes[offset + 3]) << 24) | ((0xFFL & bytes[offset + 4]) << 32)
| ((0xFFL & bytes[offset + 5]) << 40) | ((0xFFL & bytes[offset + 6]) << 48)
| ((0xFFL & bytes[offset + 7]) << 56);
return (0xFFL & bytes[offset++]) | ((0xFFL & bytes[offset++]) << 8) | ((0xFFL & bytes[offset++]) << 16)
| ((0xFFL & bytes[offset++]) << 24) | ((0xFFL & bytes[offset++]) << 32)
| ((0xFFL & bytes[offset++]) << 40) | ((0xFFL & bytes[offset++]) << 48)
| ((0xFFL & bytes[offset]) << 56);
}

public static void setInt32(byte[] bytes, int offset, int value) {
bytes[offset++] = (byte) (0xFF & value);
bytes[offset++] = (byte) (0xFF & (value >> 8));
bytes[offset++] = (byte) (0xFF & (value >> 16));
bytes[offset] = (byte) (0xFF & (value >> 24));
}

public static void setInt64(byte[] bytes, int offset, long value) {
bytes[offset++] = (byte) (0xFF & value);
bytes[offset++] = (byte) (0xFF & (value >> 8));
bytes[offset++] = (byte) (0xFF & (value >> 16));
bytes[offset++] = (byte) (0xFF & (value >> 24));
bytes[offset++] = (byte) (0xFF & (value >> 32));
bytes[offset++] = (byte) (0xFF & (value >> 40));
bytes[offset++] = (byte) (0xFF & (value >> 48));
bytes[offset] = (byte) (0xFF & (value >> 56));
}

/**
Expand Down Expand Up @@ -662,7 +680,7 @@ public static short readUnsignedInt8(ClickHouseInputStream input) throws IOExcep
* end of the stream
*/
public static void writeUnsignedInt8(OutputStream output, int value) throws IOException {
output.write((byte) (ClickHouseChecker.between(value, ClickHouseValues.TYPE_INT, 0, U_INT8_MAX) & 0xFFL));
output.write((byte) (0xFF & ClickHouseChecker.between(value, ClickHouseValues.TYPE_INT, 0, U_INT8_MAX)));
}

/**
Expand All @@ -686,7 +704,7 @@ public static short readInt16(ClickHouseInputStream input) throws IOException {
* end of the stream
*/
public static void writeInt16(OutputStream output, short value) throws IOException {
output.write(new byte[] { (byte) (0xFFL & value), (byte) (0xFFL & (value >> 8)) });
output.write(new byte[] { (byte) (0xFF & value), (byte) (0xFF & (value >> 8)) });
}

/**
Expand Down Expand Up @@ -749,8 +767,8 @@ public static int readInt32(ClickHouseInputStream input) throws IOException {
* end of the stream
*/
public static void writeInt32(OutputStream output, int value) throws IOException {
output.write(new byte[] { (byte) (0xFFL & value), (byte) (0xFFL & (value >> 8)), (byte) (0xFFL & (value >> 16)),
(byte) (0xFFL & (value >> 24)) });
output.write(new byte[] { (byte) (0xFF & value), (byte) (0xFF & (value >> 8)), (byte) (0xFF & (value >> 16)),
(byte) (0xFF & (value >> 24)) });
}

/**
Expand Down Expand Up @@ -799,14 +817,8 @@ public static long readInt64(ClickHouseInputStream input) throws IOException {
* end of the stream
*/
public static void writeInt64(OutputStream output, long value) throws IOException {
value = Long.reverseBytes(value);

byte[] bytes = new byte[8];
for (int i = 7; i >= 0; i--) {
bytes[i] = (byte) (value & 0xFFL);
value >>= 8;
}

setInt64(bytes, 0, value);
output.write(bytes);
}

Expand Down

This file was deleted.

Loading

0 comments on commit 262b3eb

Please sign in to comment.