Skip to content

Commit

Permalink
Introduce ClickHouseInputStream for tiny reads
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Nov 29, 2021
1 parent cdbab02 commit 1ee8668
Show file tree
Hide file tree
Showing 23 changed files with 1,109 additions and 579 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@ public static long between(long value, long minValue, long maxValue) {
return between(value, DEFAULT_NAME, minValue, maxValue);
}

/**
* Checks if the given {@code value} is between {@code minValue} and
* {@code maxValue} inclusive and throws a customized
* {@link IllegalArgumentException} if it is NOT.
*
* @param value the value to check
* @param name name of the value
* @param minValue minimum value to compare with
* @param maxValue maximum value to compare with
* @return the exact same value
* @throws IllegalArgumentException if the {@code value} is NOT between
* {@code minValue} and {@code maxValue}
*/
public static int between(byte value, String name, byte minValue, byte maxValue) {
if (value < minValue || value > maxValue) {
throw newException(ERR_SHOULD_BETWEEN, name, value, minValue, maxValue);
}

return value;
}

/**
* Checks if the given {@code value} is between {@code minValue} and
* {@code maxValue} inclusive and throws a customized
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.clickhouse.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -34,7 +33,7 @@ protected static <T extends ClickHouseValue> void buildMappings(
}

protected final ClickHouseConfig config;
protected final InputStream input;
protected final ClickHouseInputStream input;
protected final OutputStream output;
protected final List<ClickHouseColumn> columns;
protected final Map<String, Object> settings;
Expand All @@ -60,7 +59,7 @@ protected static <T extends ClickHouseValue> void buildMappings(
* @param settings nullable settings
* @throws IOException when failed to read columns from input stream
*/
protected ClickHouseDataProcessor(ClickHouseConfig config, InputStream input, OutputStream output,
protected ClickHouseDataProcessor(ClickHouseConfig config, ClickHouseInputStream input, OutputStream output,
List<ClickHouseColumn> columns, Map<String, Object> settings) throws IOException {
this.config = ClickHouseChecker.nonNull(config, "config");
if (input == null && output == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.clickhouse.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -40,8 +39,8 @@ public static ClickHouseDataStreamFactory getInstance() {
* @return data processor
* @throws IOException when failed to read columns from input stream
*/
public ClickHouseDataProcessor getProcessor(ClickHouseConfig config, InputStream input, OutputStream output,
Map<String, Object> settings, List<ClickHouseColumn> columns) throws IOException {
public ClickHouseDataProcessor getProcessor(ClickHouseConfig config, ClickHouseInputStream input,
OutputStream output, Map<String, Object> settings, List<ClickHouseColumn> columns) throws IOException {
ClickHouseFormat format = ClickHouseChecker.nonNull(config, "config").getFormat();
ClickHouseDataProcessor processor;
if (ClickHouseFormat.RowBinary == format || ClickHouseFormat.RowBinaryWithNamesAndTypes == format) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.clickhouse.client;

import java.io.IOException;
import java.io.InputStream;

/**
* Functional interface for deserialization.
Expand All @@ -19,5 +18,6 @@ public interface ClickHouseDeserializer<T extends ClickHouseValue> {
* @return deserialized value which might be the same instance as {@code ref}
* @throws IOException when failed to read data from input stream
*/
T deserialize(T ref, ClickHouseConfig config, ClickHouseColumn column, InputStream input) throws IOException;
T deserialize(T ref, ClickHouseConfig config, ClickHouseColumn column, ClickHouseInputStream input)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.clickhouse.client;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

/**
* Extended input stream.
*/
public abstract class ClickHouseInputStream extends InputStream {
static final class SimpleInputStream extends ClickHouseInputStream {
private final InputStream in;

private boolean closed;

protected SimpleInputStream(InputStream input) {
this.in = ClickHouseChecker.nonNull(input, "InputStream");
this.closed = false;
}

@Override
public int available() throws IOException {
return in.available();
}

@Override
public byte readByte() throws IOException {
int v = in.read();
if (v == -1) {
close();
throw new EOFException();
}

return (byte) v;
}

@Override
public boolean isClosed() {
return closed;
}

@Override
public void close() throws IOException {
in.close();
closed = true;
}

@Override
public int read() throws IOException {
return in.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}

@Override
public long skip(long n) throws IOException {
return in.skip(n);
}
}

public static ClickHouseInputStream of(InputStream input) {
return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input : new SimpleInputStream(input);
}

/**
* Reads an unsigned byte from the input stream.
*
* @return unsigned byte
* @throws IOException when failed to read value from input stream or reached
* end of the stream
*/
public int readUnsignedByte() throws IOException {
return 0xFF & readByte();
}

/**
* Reads one single byte from the input stream. This is faster than
* {@link #read()}.
*
* @return byte value if present
* @throws IOException when failed to read value from input stream or reached
* end of the stream
*/
public abstract byte readByte() throws IOException;

/**
* Checks if the input stream has been closed or not.
*
* @return true if the input stream has been closed; false otherwise
*/
public abstract boolean isClosed();
}
Loading

0 comments on commit 1ee8668

Please sign in to comment.