diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseEmptyValue.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseEmptyValue.java index 36a847aa7..0cbc58d29 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseEmptyValue.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseEmptyValue.java @@ -4,6 +4,7 @@ import java.math.BigInteger; import com.clickhouse.client.ClickHouseValue; +import com.clickhouse.client.ClickHouseValues; /** * Wrapper class of Nothing. @@ -26,27 +27,27 @@ public BigInteger asBigInteger() { @Override public byte asByte() { - throw new IllegalStateException("Empty value cannot be converted to byte"); + return (byte) 0; } @Override public double asDouble() { - throw new IllegalStateException("Empty value cannot be converted to double"); + return 0D; } @Override public float asFloat() { - throw new IllegalStateException("Empty value cannot be converted to float"); + return 0F; } @Override public int asInteger() { - throw new IllegalStateException("Empty value cannot be converted to int"); + return 0; } @Override public long asLong() { - throw new IllegalStateException("Empty value cannot be converted to long"); + return 0L; } @Override @@ -56,7 +57,7 @@ public Object asObject() { @Override public short asShort() { - throw new IllegalStateException("Empty value cannot be converted to short"); + return (short) 0; } @Override @@ -76,7 +77,7 @@ public ClickHouseValue resetToNullOrEmpty() { @Override public String toSqlExpression() { - return toString(); + return ClickHouseValues.NULL_EXPR; } @Override diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java index 38de95503..f11964190 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java @@ -60,7 +60,7 @@ public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStre protected final List columns; protected final ClickHouseResponseSummary summary; - private boolean isClosed; + private boolean closed; protected ClickHouseStreamResponse(ClickHouseConfig config, ClickHouseInputStream input, Map settings, List columns, ClickHouseResponseSummary summary) @@ -91,30 +91,28 @@ protected ClickHouseStreamResponse(ClickHouseConfig config, ClickHouseInputStrea } } this.summary = summary != null ? summary : ClickHouseResponseSummary.EMPTY; - this.isClosed = hasError; + this.closed = hasError; } @Override public boolean isClosed() { - return isClosed; + return closed; } @Override public void close() { - if (input != null) { + try { + log.debug("%d bytes skipped before closing input stream", input.skip(Long.MAX_VALUE)); + } catch (Exception e) { + // ignore + log.debug("Failed to skip reading input stream due to: %s", e.getMessage()); + } finally { try { - log.debug("%d bytes skipped before closing input stream", input.skip(Long.MAX_VALUE)); + input.close(); } catch (Exception e) { - // ignore - log.debug("Failed to skip reading input stream due to: %s", e.getMessage()); - } finally { - try { - input.close(); - } catch (Exception e) { - log.warn("Failed to close input stream", e); - } - isClosed = true; + log.warn("Failed to close input stream", e); } + closed = true; } } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickhouseLZ4InputStreamTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickhouseLZ4InputStreamTest.java index 2ca7d2528..7b934e101 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickhouseLZ4InputStreamTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickhouseLZ4InputStreamTest.java @@ -1,12 +1,104 @@ package com.clickhouse.client.data; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; public class ClickhouseLZ4InputStreamTest { + private InputStream generateInputStream(String prefix, int samples, StringBuilder builder) throws IOException { + builder.setLength(0); + + byte[] result = null; + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + ClickHouseLZ4OutputStream lz4Out = new ClickHouseLZ4OutputStream(out, 1024 * 1024)) { + for (int i = 0; i < samples; i++) { + String s = prefix + i; + lz4Out.write(s.getBytes(StandardCharsets.UTF_8)); + builder.append(s); + } + lz4Out.flush(); + result = out.toByteArray(); + // Assert.assertTrue(result.length < builder.length() / 2); + } + + return new ByteArrayInputStream(result); + } + + @DataProvider(name = "prefixes") + private Object[][] getPrefixes() { + return new Object[][] { { "test" }, { "萌萌哒" }, + { "1😂2萌🥘" } }; + }; + + @Test(dataProvider = "prefixes", groups = { "unit" }) + public void testReadByte(String prefix) throws IOException { + StringBuilder builder = new StringBuilder(); + boolean readAll = false; + try (InputStream in = generateInputStream(prefix, 10000, builder); + ClickHouseLZ4InputStream lz4In = new ClickHouseLZ4InputStream(in); + ByteArrayOutputStream out = new ByteArrayOutputStream();) { + try { + while (true) { + out.write(0xFF & lz4In.readByte()); + } + } catch (EOFException e) { + readAll = true; + } + + out.flush(); + + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.UTF_8), builder.toString()); + } + Assert.assertTrue(readAll, "All bytes should have read without any issue"); + } + + @Test(dataProvider = "prefixes", groups = { "unit" }) + public void testRead(String prefix) throws IOException { + StringBuilder builder = new StringBuilder(); + boolean readAll = false; + try (InputStream in = generateInputStream(prefix, 10000, builder); + ClickHouseLZ4InputStream lz4In = new ClickHouseLZ4InputStream(in); + ByteArrayOutputStream out = new ByteArrayOutputStream();) { + int result = 0; + while ((result = lz4In.read()) != -1) { + out.write(result); + } + out.flush(); + readAll = true; + + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.UTF_8), builder.toString()); + } + Assert.assertTrue(readAll, "All bytes should have read without any issue"); + } + + @Test(dataProvider = "prefixes", groups = { "unit" }) + public void testReadBytes(String prefix) throws IOException { + StringBuilder builder = new StringBuilder(); + boolean readAll = false; + for (int i = 1; i < 1025; i++) { + byte[] bytes = new byte[i]; + try (InputStream in = generateInputStream(prefix, 10000, builder); + ClickHouseLZ4InputStream lz4In = new ClickHouseLZ4InputStream(in); + ByteArrayOutputStream out = new ByteArrayOutputStream();) { + int result = 0; + while ((result = lz4In.read(bytes)) != -1) { + out.write(bytes, 0, result); + } + out.flush(); + readAll = true; + + Assert.assertEquals(new String(out.toByteArray(), StandardCharsets.UTF_8), builder.toString()); + } + Assert.assertTrue(readAll, "All bytes should have read without any issue"); + } + } + @Test(groups = { "unit" }) public void testLZ4Stream() throws IOException { StringBuilder sb = new StringBuilder();