diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java new file mode 100644 index 000000000..c2ba204fd --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java @@ -0,0 +1,213 @@ +package com.clickhouse.client; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; + +/** + * Base class for implementing a thread-safe ClickHouse client. It uses + * {@link ReadWriteLock} to manage access to underlying connection. + */ +public abstract class AbstractClient implements ClickHouseClient { + private static final Logger log = LoggerFactory.getLogger(AbstractClient.class); + + private boolean initialized; + + private ExecutorService executor; + private ClickHouseConfig config; + private ClickHouseNode server; + private T connection; + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private void ensureInitialized() { + if (!initialized) { + throw new IllegalStateException("Please initialize the client first"); + } + } + + // just for testing purpose + final boolean isInitialized() { + return initialized; + } + + /** + * Gets executor service for this client. + * + * @return executor service + */ + protected final ExecutorService getExecutor() { + lock.readLock().lock(); + try { + ensureInitialized(); + return executor; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Gets current server. + * + * @return current server, could be null + * @throws IllegalStateException when the client is either closed or not + * initialized + */ + protected final ClickHouseNode getServer() { + lock.readLock().lock(); + try { + ensureInitialized(); + return server; + } finally { + lock.readLock().unlock(); + } + } + + /** + * Creates a new connection. This method will be called from + * {@link #getConnection(ClickHouseRequest)} as needed. + * + * @param config non-null configuration + * @param server non-null server + * @return new connection + * @throws CompletionException when error occured + */ + protected abstract T newConnection(ClickHouseConfig config, ClickHouseNode server); + + /** + * Closes a connection. This method will be called from {@link #close()}. + * + * @param connection connection to close + * @param force whether force to close the connection or not + */ + protected abstract void closeConnection(T connection, boolean force); + + /** + * Gets a connection according to the given request. + * + * @param request non-null request + * @return non-null connection + * @throws CompletionException when error occured + */ + protected final T getConnection(ClickHouseRequest request) { + ClickHouseNode newNode = ClickHouseChecker.nonNull(request, "request").getServer(); + + lock.readLock().lock(); + try { + ensureInitialized(); + if (connection != null && newNode.equals(server)) { + return connection; + } + } finally { + lock.readLock().unlock(); + } + + lock.writeLock().lock(); + try { + if (connection != null) { + log.debug("Closing connection: %s", connection); + closeConnection(connection, false); + } + + server = newNode; + log.debug("Connecting to: %s", newNode); + connection = newConnection(config, server); + log.debug("Connection established: %s", connection); + + return connection; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public final ClickHouseConfig getConfig() { + lock.readLock().lock(); + try { + ensureInitialized(); + return config; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void init(ClickHouseConfig config) { + ClickHouseChecker.nonNull(config, "config"); + + lock.writeLock().lock(); + try { + this.config = config; + if (this.executor == null) { // only initialize once + int threads = config.getMaxThreadsPerClient(); + this.executor = threads <= 0 ? ClickHouseClient.getExecutorService() + : ClickHouseUtils.newThreadPool(getClass().getSimpleName(), threads, + config.getMaxQueuedRequests()); + } + + initialized = true; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public final void close() { + lock.readLock().lock(); + try { + if (!initialized) { + return; + } + } finally { + lock.readLock().unlock(); + } + + lock.writeLock().lock(); + try { + server = null; + + if (executor != null) { + executor.shutdown(); + } + + if (connection != null) { + closeConnection(connection, false); + } + + // shutdown* won't shutdown commonPool, so awaitTermination will always time out + // on the other hand, for a client-specific thread pool, we'd better shut it + // down for real + if (executor != null && config.getMaxThreadsPerClient() > 0 + && !executor.awaitTermination(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + + executor = null; + connection = null; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (RuntimeException e) { + log.warn("Exception occurred when closing client", e); + } finally { + initialized = false; + try { + if (connection != null) { + closeConnection(connection, true); + } + + if (executor != null) { + executor.shutdownNow(); + } + } finally { + executor = null; + connection = null; + lock.writeLock().unlock(); + } + } + } +} diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index 7dd0eeca0..e1fbb5fca 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -76,6 +76,8 @@ static CompletableFuture submit(Callable task) { return (boolean) ClickHouseDefaults.ASYNC.getEffectiveDefaultValue() ? CompletableFuture.supplyAsync(() -> { try { return task.call(); + } catch (CompletionException e) { + throw e; } catch (Exception e) { throw new CompletionException(e); } @@ -83,7 +85,7 @@ static CompletableFuture submit(Callable task) { } catch (CompletionException e) { throw e; } catch (Exception e) { - throw new CompletionException(e); + throw new CompletionException(e.getCause() != null ? e.getCause() : e); } } @@ -343,6 +345,11 @@ static CompletableFuture> send(ClickHouseNode se list.add(resp.getSummary()); } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseExceptionSpecifier.specify(e, theServer); + } catch (ExecutionException e) { + throw ClickHouseExceptionSpecifier.handle(e, theServer); } return list; @@ -375,6 +382,11 @@ static CompletableFuture send(ClickHouseNode server, ClickHouseResponse resp = client.connect(theServer).format(ClickHouseFormat.RowBinary).query(sql) .params(params).execute().get()) { return resp.getSummary(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseExceptionSpecifier.specify(e, theServer); + } catch (ExecutionException e) { + throw ClickHouseExceptionSpecifier.handle(e, theServer); } }); } @@ -461,6 +473,11 @@ static CompletableFuture> send(ClickHouseNode se list.add(resp.getSummary()); } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseExceptionSpecifier.specify(e, theServer); + } catch (ExecutionException e) { + throw ClickHouseExceptionSpecifier.handle(e, theServer); } return list; @@ -507,6 +524,11 @@ static CompletableFuture> send(ClickHouseNode se list.add(resp.getSummary()); } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ClickHouseExceptionSpecifier.specify(e, theServer); + } catch (ExecutionException e) { + throw ClickHouseExceptionSpecifier.handle(e, theServer); } return list; @@ -556,9 +578,9 @@ default ClickHouseRequest connect(Function execute(ClickHouseRequest request) throws ClickHouseException; + CompletableFuture execute(ClickHouseRequest request); /** * Gets the immutable configuration associated with this client. In most cases diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRecord.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRecord.java index 2e88001a2..9d50684d7 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRecord.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRecord.java @@ -10,13 +10,6 @@ * whole data set. */ public interface ClickHouseRecord extends Iterable, Serializable { - /** - * Gets size of the record. - * - * @return size of the record - */ - int size(); - /** * Gets deserialized value wrapped in an object using column index. Please avoid * to cache the wrapper object, as it's reused among records for memory @@ -64,4 +57,11 @@ public ClickHouseValue next() { } }; } + + /** + * Gets size of the record. + * + * @return size of the record + */ + int size(); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index 133efefed..bd561f257 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -69,7 +69,7 @@ public enum ClickHouseClientOption implements ClickHouseConfigOption { /** * Maximum size of thread pool for each client. */ - MAX_THREADS_PER_CLIENT("max_threads_per_client", 1, + MAX_THREADS_PER_CLIENT("max_threads_per_client", 0, "Size of thread pool for each client instance, 0 or negative number means the client will use shared thread pool."), /** * Whether to enable retry. diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleRecord.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleRecord.java index 699de2c4a..a687da098 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleRecord.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleRecord.java @@ -1,5 +1,6 @@ package com.clickhouse.client.data; +import java.util.Collections; import java.util.List; import com.clickhouse.client.ClickHouseColumn; @@ -12,8 +13,10 @@ * which is simply a combination of list of columns and array of values. */ public class ClickHouseSimpleRecord implements ClickHouseRecord { - protected final List columns; + public static final ClickHouseSimpleRecord EMPTY = new ClickHouseSimpleRecord(Collections.emptyList(), + new ClickHouseValue[0]); + private final List columns; private ClickHouseValue[] values; /** @@ -29,6 +32,8 @@ public static ClickHouseRecord of(List columns, ClickHouseValu } else if (columns.size() != values.length) { throw new IllegalArgumentException(ClickHouseUtils.format( "Mismatched count: we have %d columns but we got %d values", columns.size(), values.length)); + } else if (values.length == 0) { + return EMPTY; } return new ClickHouseSimpleRecord(columns, values); @@ -39,6 +44,10 @@ protected ClickHouseSimpleRecord(List columns, ClickHouseValue this.values = values; } + protected List getColumns() { + return columns; + } + protected ClickHouseValue[] getValues() { return values; } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java index b910dc814..aa521ab7e 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java @@ -13,21 +13,27 @@ import com.clickhouse.client.ClickHouseValues; /** - * A simple response built on top of two lists: columns and values. + * A simple response built on top of two lists: columns and records. */ public class ClickHouseSimpleResponse implements ClickHouseResponse { + public static final ClickHouseSimpleResponse EMPTY = new ClickHouseSimpleResponse(Collections.emptyList(), + new ClickHouseValue[0][]); + private final List columns; private final List records; /** * Creates a response object using columns definition and raw values. * - * @param structure column definition - * @param values non-null raw values + * @param columns list of columns + * @param values raw values, which may or may not be null * @return response object */ - public static ClickHouseResponse of(String structure, Object[][] values) { - List columns = ClickHouseColumn.parse(structure); + public static ClickHouseResponse of(List columns, Object[][] values) { + if (columns == null || columns.isEmpty()) { + return EMPTY; + } + int size = columns.size(); int len = values != null ? values.length : 0; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java index b33c4f472..3fb13dc47 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java @@ -31,6 +31,25 @@ public class ClickHouseStreamResponse implements ClickHouseResponse { protected static final List defaultTypes = Collections .singletonList(ClickHouseColumn.of("results", "Nullable(String)")); + public static ClickHouseResponse of(ClickHouseConfig config, InputStream input) throws IOException { + return of(config, input, null, null); + } + + public static ClickHouseResponse of(ClickHouseConfig config, InputStream input, Map settings) + throws IOException { + return of(config, input, settings, null); + } + + public static ClickHouseResponse of(ClickHouseConfig config, InputStream input, List columns) + throws IOException { + return of(config, input, null, columns); + } + + public static ClickHouseResponse of(ClickHouseConfig config, InputStream input, Map settings, + List columns) throws IOException { + return new ClickHouseStreamResponse(config, input, settings, columns); + } + protected final ClickHouseConfig config; protected final transient InputStream input; protected final transient ClickHouseDataProcessor processor; @@ -38,7 +57,7 @@ public class ClickHouseStreamResponse implements ClickHouseResponse { private boolean isClosed; - protected ClickHouseStreamResponse(ClickHouseConfig config, Map settings, InputStream input, + protected ClickHouseStreamResponse(ClickHouseConfig config, InputStream input, Map settings, List columns) throws IOException { if (config == null || input == null) { throw new IllegalArgumentException("Non-null configuration and input stream are required"); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/exception/ClickHouseExceptionSpecifier.java b/clickhouse-client/src/main/java/com/clickhouse/client/exception/ClickHouseExceptionSpecifier.java index 78f0f4eab..2fb5ef7ef 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/exception/ClickHouseExceptionSpecifier.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/exception/ClickHouseExceptionSpecifier.java @@ -25,6 +25,7 @@ private ClickHouseExceptionSpecifier() { * * @param e ExecutionException * @param server server + * @return exception */ public static ClickHouseException handle(ExecutionException e, ClickHouseNode server) { Throwable cause = e.getCause(); diff --git a/clickhouse-client/src/main/java9/module-info.java b/clickhouse-client/src/main/java9/module-info.java index 378c0b257..36ebb9a6f 100644 --- a/clickhouse-client/src/main/java9/module-info.java +++ b/clickhouse-client/src/main/java9/module-info.java @@ -6,16 +6,7 @@ exports com.clickhouse.client.config; exports com.clickhouse.client.data; exports com.clickhouse.client.exception; - - exports com.clickhouse.client.logging to - com.clickhouse.client.http, - com.clickhouse.client.grpc, - com.clickhouse.client.mysql, - com.clickhouse.client.postgresql, - // native is a reserved keyword :< - com.clickhouse.client.tcp, - com.clickhouse.jdbc, - ru.yandex.clickhouse; + exports com.clickhouse.client.logging; requires static java.logging; requires static com.google.gson; diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/AbstractClientTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/AbstractClientTest.java new file mode 100644 index 000000000..4ba212443 --- /dev/null +++ b/clickhouse-client/src/test/java/com/clickhouse/client/AbstractClientTest.java @@ -0,0 +1,170 @@ +package com.clickhouse.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class AbstractClientTest { + static class SimpleClient extends AbstractClient { + @Override + public CompletableFuture execute(ClickHouseRequest request) { + return null; + } + + @Override + protected Object[] newConnection(ClickHouseConfig config, ClickHouseNode server) { + return new Object[] { config, server }; + } + + @Override + protected void closeConnection(Object[] connection, boolean force) { + connection[0] = null; + connection[1] = null; + } + } + + @Test(groups = { "unit" }) + public void testClose() { + SimpleClient sc = new SimpleClient(); + Assert.assertFalse(sc.isInitialized()); + sc.close(); + Assert.assertFalse(sc.isInitialized()); + + sc.init(new ClickHouseConfig()); + Assert.assertNotNull(sc.getExecutor()); + Assert.assertTrue(sc.isInitialized()); + Assert.assertNotNull(sc.getConfig()); + Assert.assertNull(sc.getServer()); + sc.close(); + Assert.assertFalse(sc.isInitialized()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getConfig()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getExecutor()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getServer()); + + SimpleClient client = new SimpleClient(); + client.init(new ClickHouseConfig()); + ClickHouseRequest req = client.connect(ClickHouseNode.builder().build()); + ClickHouseConfig config = new ClickHouseConfig(); + sc.init(config); + Assert.assertNotNull(sc.getExecutor()); + Assert.assertTrue(sc.isInitialized()); + Assert.assertNotNull(sc.getConfig()); + Assert.assertNull(sc.getServer()); + Assert.assertEquals(sc.getConnection(req), new Object[] { config, req.getServer() }); + sc.close(); + Assert.assertFalse(sc.isInitialized()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getConfig()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getConnection(req)); + Assert.assertThrows(IllegalStateException.class, () -> sc.getExecutor()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getServer()); + } + + @Test(groups = { "unit" }) + public void testCloseRunningClient() throws InterruptedException { + SimpleClient client = new SimpleClient(); + client.init(new ClickHouseConfig()); + ClickHouseRequest req = client.connect(ClickHouseNode.builder().build()); + + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + Object[] conn = client.getConnection(req); + Thread.sleep(1000L); + client.close(); + latch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }).start(); + Assert.assertTrue(client.isInitialized()); + Assert.assertTrue(latch.await(5000L, TimeUnit.MILLISECONDS)); + Assert.assertFalse(client.isInitialized()); + } + + @Test(groups = { "unit" }) + public void testGetAndCloseConnection() { + SimpleClient client = new SimpleClient(); + client.init(new ClickHouseConfig()); + ClickHouseRequest req = client.connect(ClickHouseNode.builder().build()); + + SimpleClient sc = new SimpleClient(); + sc.init(new ClickHouseConfig()); + Assert.assertEquals(sc.getConnection(req), new Object[] { sc.getConfig(), req.getServer() }); + + req = client.connect(ClickHouseNode.of("127.0.0.1", ClickHouseProtocol.GRPC, 9100, "test")); + Object[] conn = sc.getConnection(req); + Assert.assertEquals(conn, new Object[] { sc.getConfig(), req.getServer() }); + sc.close(); + Assert.assertNull(conn[0]); + Assert.assertNull(conn[1]); + } + + @Test(groups = { "unit" }) + public void testInit() { + SimpleClient client = new SimpleClient(); + client.init(new ClickHouseConfig()); + ClickHouseRequest req = client.connect(ClickHouseNode.builder().build()); + + SimpleClient sc = new SimpleClient(); + Assert.assertFalse(sc.isInitialized()); + Assert.assertThrows(IllegalStateException.class, () -> sc.connect(ClickHouseNode.builder().build())); + Assert.assertThrows(IllegalStateException.class, () -> sc.getConfig()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getConnection(req)); + Assert.assertThrows(IllegalStateException.class, () -> sc.getExecutor()); + Assert.assertThrows(IllegalStateException.class, () -> sc.getServer()); + + ClickHouseConfig config = new ClickHouseConfig(); + sc.init(config); + Assert.assertEquals(sc.getExecutor(), ClickHouseClient.getExecutorService()); + Assert.assertTrue(sc.isInitialized()); + Assert.assertTrue(sc.getConfig() == config); + Assert.assertNull(sc.getServer()); + Assert.assertEquals(sc.getConnection(req), new Object[] { config, req.getServer() }); + Assert.assertEquals(sc.getServer(), req.getServer()); + + ClickHouseConfig newConfig = new ClickHouseConfig(); + sc.init(newConfig); + Assert.assertEquals(sc.getExecutor(), ClickHouseClient.getExecutorService()); + Assert.assertTrue(sc.isInitialized()); + Assert.assertTrue(sc.getConfig() != config); + Assert.assertEquals(sc.getConnection(req), new Object[] { config, req.getServer() }); + Assert.assertEquals(sc.getServer(), req.getServer()); + } + + @Test(groups = { "unit" }) + public void testSwitchNode() throws InterruptedException { + ClickHouseConfig config = new ClickHouseConfig(); + SimpleClient client = new SimpleClient(); + client.init(config); + ClickHouseRequest req1 = client.connect(ClickHouseNode.builder().build()); + ClickHouseRequest req2 = client + .connect(ClickHouseNode.of("127.0.0.1", ClickHouseProtocol.GRPC, 9100, "test")); + + Object[] conn1 = client.getConnection(req1); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + Thread.sleep(1000L); + Object[] conn2 = client.getConnection(req2); + latch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }).start(); + Assert.assertEquals(conn1, new Object[] { config, req1.getServer() }); + Assert.assertTrue(latch.await(5000L, TimeUnit.MILLISECONDS)); + Assert.assertTrue(client.isInitialized()); + Assert.assertNull(conn1[0]); + Assert.assertNull(conn1[1]); + Object[] conn2 = client.getConnection(req2); + Assert.assertTrue(conn1 != conn2); + Assert.assertEquals(conn2, new Object[] { config, req2.getServer() }); + } +} diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java index 3f3544cc6..7484565b3 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java @@ -2,8 +2,6 @@ import java.util.concurrent.CompletableFuture; -import com.clickhouse.client.exception.ClickHouseException; - public class ClickHouseTestClient implements ClickHouseClient { private ClickHouseConfig clientConfig; @@ -13,7 +11,7 @@ public boolean accept(ClickHouseProtocol protocol) { } @Override - public CompletableFuture execute(ClickHouseRequest request) throws ClickHouseException { + public CompletableFuture execute(ClickHouseRequest request) { return CompletableFuture.supplyAsync(() -> null); } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleRecordTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleRecordTest.java index c8bfa1771..653fbf9dd 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleRecordTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleRecordTest.java @@ -19,7 +19,7 @@ public void testNullInput() { () -> ClickHouseSimpleRecord.of(Collections.emptyList(), null)); ClickHouseSimpleRecord record = new ClickHouseSimpleRecord(null, null); - Assert.assertNull(record.columns); + Assert.assertNull(record.getColumns()); Assert.assertNull(record.getValues()); } @@ -30,7 +30,7 @@ public void testMismatchedColumnsAndValues() { ClickHouseSimpleRecord record = new ClickHouseSimpleRecord(Arrays.asList(ClickHouseColumn.of("a", "String")), new ClickHouseValue[0]); - Assert.assertEquals(record.columns, Arrays.asList(ClickHouseColumn.of("a", "String"))); + Assert.assertEquals(record.getColumns(), Arrays.asList(ClickHouseColumn.of("a", "String"))); Assert.assertEquals(record.getValues(), new ClickHouseValue[0]); } @@ -38,7 +38,7 @@ public void testMismatchedColumnsAndValues() { public void testGetValueByIndex() { ClickHouseSimpleRecord record = new ClickHouseSimpleRecord(ClickHouseColumn.parse("a String, b UInt32"), new ClickHouseValue[] { ClickHouseStringValue.of("123"), ClickHouseLongValue.of(1L, true) }); - Assert.assertEquals(record.columns, ClickHouseColumn.parse("a String, b UInt32")); + Assert.assertEquals(record.getColumns(), ClickHouseColumn.parse("a String, b UInt32")); Assert.assertEquals(record.getValues(), new ClickHouseValue[] { ClickHouseStringValue.of("123"), ClickHouseLongValue.of(1L, true) }); @@ -64,7 +64,7 @@ public void testGetValueByName() { ClickHouseColumn.parse("`a One` String, `x木哈哈x` UInt32, test Nullable(String)"), new ClickHouseValue[] { ClickHouseStringValue.of("123"), ClickHouseLongValue.of(1L, true), ClickHouseStringValue.ofNull() }); - Assert.assertEquals(record.columns, + Assert.assertEquals(record.getColumns(), ClickHouseColumn.parse("`a One` String, `x木哈哈x` UInt32, test Nullable(String)")); Assert.assertEquals(record.getValues(), new ClickHouseValue[] { ClickHouseStringValue.of("123"), ClickHouseLongValue.of(1L, true), ClickHouseStringValue.ofNull() }); diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleResponseTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleResponseTest.java index 7089fae92..ccfdb1121 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleResponseTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHouseSimpleResponseTest.java @@ -19,12 +19,13 @@ public void testNullOrEmptyInput() { Assert.assertTrue(((List) nullResp.records()).isEmpty()); Assert.assertThrows(NoSuchElementException.class, () -> nullResp.firstRecord()); - ClickHouseResponse emptyResp1 = ClickHouseSimpleResponse.of("a String", null); + ClickHouseResponse emptyResp1 = ClickHouseSimpleResponse.of(ClickHouseColumn.parse("a String"), null); Assert.assertEquals(emptyResp1.getColumns(), ClickHouseColumn.parse("a String")); Assert.assertTrue(((List) emptyResp1.records()).isEmpty()); Assert.assertThrows(NoSuchElementException.class, () -> emptyResp1.firstRecord()); - ClickHouseResponse emptyResp2 = ClickHouseSimpleResponse.of("a String", new Object[0][]); + ClickHouseResponse emptyResp2 = ClickHouseSimpleResponse.of(ClickHouseColumn.parse("a String"), + new Object[0][]); Assert.assertEquals(emptyResp2.getColumns(), ClickHouseColumn.parse("a String")); Assert.assertTrue(((List) emptyResp2.records()).isEmpty()); Assert.assertThrows(NoSuchElementException.class, () -> emptyResp2.firstRecord()); @@ -32,32 +33,34 @@ public void testNullOrEmptyInput() { @Test(groups = { "unit" }) public void testMismatchedColumnsAndRecords() { - ClickHouseResponse resp = ClickHouseSimpleResponse.of("a Nullable(String), b UInt8, c Array(UInt32)", - new Object[][] { new Object[0], null, new Object[] { 's' }, new Object[] { null, null, null, null }, - new Object[] { "123", 1, new int[] { 3, 2, 1 } } }); + ClickHouseResponse resp = ClickHouseSimpleResponse + .of(ClickHouseColumn.parse("a Nullable(String), b UInt8, c Array(UInt32)"), + new Object[][] { new Object[0], null, new Object[] { 's' }, + new Object[] { null, null, null, null }, + new Object[] { "123", 1, new int[] { 3, 2, 1 } } }); int i = 0; for (ClickHouseRecord r : resp.records()) { switch (i) { - case 0: - case 1: - case 3: - Assert.assertNull(r.getValue(0).asObject()); - Assert.assertNull(r.getValue(1).asObject()); - Assert.assertEquals(r.getValue(2).asObject(), new long[0]); - break; - case 2: - Assert.assertEquals(r.getValue(0).asObject(), "s"); - Assert.assertNull(r.getValue(1).asObject()); - Assert.assertEquals(r.getValue(2).asObject(), new long[0]); - break; - case 4: - Assert.assertEquals(r.getValue(0).asObject(), "123"); - Assert.assertEquals(r.getValue(1).asObject(), (short) 1); - Assert.assertEquals(r.getValue(2).asObject(), new long[] { 3L, 2L, 1L }); - break; - default: - Assert.fail("Should not fail"); - break; + case 0: + case 1: + case 3: + Assert.assertNull(r.getValue(0).asObject()); + Assert.assertNull(r.getValue(1).asObject()); + Assert.assertEquals(r.getValue(2).asObject(), new long[0]); + break; + case 2: + Assert.assertEquals(r.getValue(0).asObject(), "s"); + Assert.assertNull(r.getValue(1).asObject()); + Assert.assertEquals(r.getValue(2).asObject(), new long[0]); + break; + case 4: + Assert.assertEquals(r.getValue(0).asObject(), "123"); + Assert.assertEquals(r.getValue(1).asObject(), (short) 1); + Assert.assertEquals(r.getValue(2).asObject(), new long[] { 3L, 2L, 1L }); + break; + default: + Assert.fail("Should not fail"); + break; } i++; } @@ -65,7 +68,8 @@ public void testMismatchedColumnsAndRecords() { @Test(groups = { "unit" }) public void testFirstRecord() { - ClickHouseResponse resp = ClickHouseSimpleResponse.of("a Nullable(String), b UInt8, c String", + ClickHouseResponse resp = ClickHouseSimpleResponse.of( + ClickHouseColumn.parse("a Nullable(String), b UInt8, c String"), new Object[][] { new Object[] { "aaa", 2, "ccc" }, null }); ClickHouseRecord record = resp.firstRecord(); Assert.assertEquals(record.getValue("A"), ClickHouseStringValue.of("aaa")); @@ -78,30 +82,31 @@ public void testFirstRecord() { @Test(groups = { "unit" }) public void testRecords() { - ClickHouseResponse resp = ClickHouseSimpleResponse.of("a Nullable(String), b UInt8, c String", + ClickHouseResponse resp = ClickHouseSimpleResponse.of( + ClickHouseColumn.parse("a Nullable(String), b UInt8, c String"), new Object[][] { new Object[] { "aaa1", null, "ccc1" }, new Object[] { "aaa2", 2, "ccc2" }, new Object[] { null, 3L, null } }); int i = 0; for (ClickHouseRecord r : resp.records()) { switch (i) { - case 0: - Assert.assertEquals(r.getValue(0).asObject(), "aaa1"); - Assert.assertNull(r.getValue(1).asObject()); - Assert.assertEquals(r.getValue(2).asObject(), "ccc1"); - break; - case 1: - Assert.assertEquals(r.getValue("a").asObject(), "aaa2"); - Assert.assertEquals(r.getValue("B").asObject(), (short) 2); - Assert.assertEquals(r.getValue("c").asObject(), "ccc2"); - break; - case 2: - Assert.assertNull(r.getValue(0).asObject()); - Assert.assertEquals(r.getValue(1).asObject(), (short) 3); - Assert.assertNull(r.getValue(0).asObject()); - break; - default: - Assert.fail("Should not fail"); - break; + case 0: + Assert.assertEquals(r.getValue(0).asObject(), "aaa1"); + Assert.assertNull(r.getValue(1).asObject()); + Assert.assertEquals(r.getValue(2).asObject(), "ccc1"); + break; + case 1: + Assert.assertEquals(r.getValue("a").asObject(), "aaa2"); + Assert.assertEquals(r.getValue("B").asObject(), (short) 2); + Assert.assertEquals(r.getValue("c").asObject(), "ccc2"); + break; + case 2: + Assert.assertNull(r.getValue(0).asObject()); + Assert.assertEquals(r.getValue(1).asObject(), (short) 3); + Assert.assertNull(r.getValue(0).asObject()); + break; + default: + Assert.fail("Should not fail"); + break; } i++; } diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java index 36cbd0961..e1d5ee71d 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java @@ -8,19 +8,16 @@ import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.protobuf.ByteString; import io.grpc.Context; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.stub.StreamObserver; + +import com.clickhouse.client.AbstractClient; import com.clickhouse.client.ClickHouseChecker; -import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseColumn; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; @@ -29,8 +26,8 @@ import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseUtils; -import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseExternalTable; +import com.clickhouse.client.exception.ClickHouseExceptionSpecifier; import com.clickhouse.client.grpc.impl.ClickHouseGrpc; import com.clickhouse.client.grpc.impl.ExternalTable; import com.clickhouse.client.grpc.impl.NameAndType; @@ -40,20 +37,9 @@ import com.clickhouse.client.logging.Logger; import com.clickhouse.client.logging.LoggerFactory; -public class ClickHouseGrpcClient implements ClickHouseClient { +public class ClickHouseGrpcClient extends AbstractClient { private static final Logger log = LoggerFactory.getLogger(ClickHouseGrpcClient.class); - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - // not going to offload executor to ClickHouseGrpcFuture, as we can manage - // thread pool better here - private final AtomicReference executor = new AtomicReference<>(); - - // do NOT use below members directly without ReadWriteLock - private final AtomicReference config = new AtomicReference<>(); - private final AtomicReference server = new AtomicReference<>(); - private final AtomicReference channel = new AtomicReference<>(); - protected static QueryInfo convert(ClickHouseNode server, ClickHouseRequest request) { ClickHouseConfig config = request.getConfig(); ClickHouseCredentials credentials = server.getCredentials().orElse(config.getDefaultCredentials()); @@ -141,142 +127,31 @@ protected static QueryInfo convert(ClickHouseNode server, ClickHouseRequest r return builder.setQuery(sql).build(); } - protected void fill(ClickHouseRequest request, StreamObserver observer) { - try { - observer.onNext(convert(getServer(), request)); - } finally { - observer.onCompleted(); - } - } - - protected ClickHouseNode getServer() { - lock.readLock().lock(); - try { - return server.get(); - } finally { - lock.readLock().unlock(); - } - } - - protected ManagedChannel getChannel(ClickHouseRequest request) { - boolean prepared = true; - ClickHouseNode newNode = ClickHouseChecker.nonNull(request, "request").getServer(); - - lock.readLock().lock(); - ManagedChannel c = channel.get(); - try { - prepared = c != null && newNode.equals(server.get()); - - if (prepared) { - return c; - } - } finally { - lock.readLock().unlock(); - } - - lock.writeLock().lock(); - c = channel.get(); - try { - // first time? - if (c == null) { - server.set(newNode); - channel.set(c = ClickHouseGrpcChannelFactory.getFactory(getConfig(), newNode).create()); - } else if (!newNode.equals(server.get())) { - log.debug("Shutting down channel: %s", c); - c.shutdownNow(); - - server.set(newNode); - channel.set(c = ClickHouseGrpcChannelFactory.getFactory(getConfig(), newNode).create()); - } - - return c; - } finally { - lock.writeLock().unlock(); - } - } - @Override - public boolean accept(ClickHouseProtocol protocol) { - return ClickHouseProtocol.GRPC == protocol || ClickHouseClient.super.accept(protocol); + protected void closeConnection(ManagedChannel connection, boolean force) { + if (!force) { + connection.shutdown(); + } else { + connection.shutdownNow(); + } } @Override - public ClickHouseConfig getConfig() { - lock.readLock().lock(); - try { - return config.get(); - } finally { - lock.readLock().unlock(); - } + protected ManagedChannel newConnection(ClickHouseConfig config, ClickHouseNode server) { + return ClickHouseGrpcChannelFactory.getFactory(config, server).create(); } - @Override - public void init(ClickHouseConfig config) { - lock.writeLock().lock(); + protected void fill(ClickHouseRequest request, StreamObserver observer) { try { - this.config.set(config); - ClickHouseClient.super.init(config); - if (this.executor.get() == null) { // only initialize once - int threads = config.getMaxThreadsPerClient(); - this.executor.set(threads <= 0 ? ClickHouseClient.getExecutorService() - : ClickHouseUtils.newThreadPool(ClickHouseGrpcClient.class.getSimpleName(), threads, - config.getMaxQueuedRequests())); - } + observer.onNext(convert(getServer(), request)); } finally { - lock.writeLock().unlock(); + observer.onCompleted(); } } @Override - public void close() { - lock.writeLock().lock(); - - ExecutorService s = executor.get(); - ManagedChannel m = channel.get(); - - try { - server.set(null); - - if (s != null) { - s.shutdown(); - executor.set(null); - } - - if (m != null) { - m.shutdown(); - channel.set(null); - } - - ClickHouseConfig c = config.get(); - if (c != null) { - config.set(null); - } - - // shutdown* won't shutdown commonPool, so awaitTermination will always time out - // on the other hand, for a client-specific thread pool, we'd better shut it - // down for real - if (s != null && c != null && c.getMaxThreadsPerClient() > 0 - && !s.awaitTermination((int) c.getOption(ClickHouseClientOption.CONNECTION_TIMEOUT), - TimeUnit.MILLISECONDS)) { - s.shutdownNow(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (RuntimeException e) { - log.warn("Exception occurred when closing client", e); - } finally { - try { - if (m != null) { - m.shutdownNow(); - } - - if (s != null) { - s.shutdownNow(); - } - } finally { - lock.writeLock().unlock(); - } - } + public boolean accept(ClickHouseProtocol protocol) { + return ClickHouseProtocol.GRPC == protocol || super.accept(protocol); } protected CompletableFuture executeAsync(ClickHouseRequest sealedRequest, @@ -290,7 +165,7 @@ protected CompletableFuture executeAsync(ClickHouseRequest requestObserver = stub.executeQueryWithStreamIO(responseObserver); if (sealedRequest.hasInputStream()) { - executor.get().execute(() -> fill(sealedRequest, requestObserver)); + getExecutor().execute(() -> fill(sealedRequest, requestObserver)); } else { fill(sealedRequest, requestObserver); } @@ -310,16 +185,22 @@ protected CompletableFuture executeAsync(ClickHouseRequest executeSync(ClickHouseRequest sealedRequest, @@ -330,8 +211,14 @@ protected CompletableFuture executeSync(ClickHouseRequest // TODO not as elegant as ClickHouseImmediateFuture :< try { Result result = stub.executeQuery(convert(server, sealedRequest)); - return CompletableFuture.completedFuture( - new ClickHouseGrpcResponse(sealedRequest.getConfig(), sealedRequest.getSettings(), result)); + + ClickHouseResponse response = new ClickHouseGrpcResponse(sealedRequest.getConfig(), + sealedRequest.getSettings(), result); + + return result.hasException() + ? CompletableFuture.failedFuture( + ClickHouseExceptionSpecifier.specify(result.getException().getDisplayText(), server)) + : CompletableFuture.completedFuture(response); } catch (IOException e) { throw new CompletionException(e); } @@ -341,7 +228,7 @@ protected CompletableFuture executeSync(ClickHouseRequest public CompletableFuture execute(ClickHouseRequest request) { // sealedRequest is an immutable copy of the original request final ClickHouseRequest sealedRequest = request.seal(); - final ManagedChannel c = getChannel(sealedRequest); + final ManagedChannel c = getConnection(sealedRequest); final ClickHouseNode s = getServer(); return sealedRequest.getConfig().isAsync() ? executeAsync(sealedRequest, c, s) diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java index 7f6b28766..9c1d8f210 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcResponse.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.util.Map; + import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.data.ClickHouseStreamResponse; @@ -13,7 +14,7 @@ public class ClickHouseGrpcResponse extends ClickHouseStreamResponse { protected ClickHouseGrpcResponse(ClickHouseConfig config, Map settings, ClickHouseStreamObserver observer) throws IOException { - super(config, settings, observer.getInputStream(), null); + super(config, observer.getInputStream(), settings, null); this.observer = observer; this.result = null; @@ -21,7 +22,7 @@ protected ClickHouseGrpcResponse(ClickHouseConfig config, Map se protected ClickHouseGrpcResponse(ClickHouseConfig config, Map settings, Result result) throws IOException { - super(config, settings, result.getOutput().newInput(), null); + super(config, result.getOutput().newInput(), settings, null); this.observer = null; this.result = result;