Skip to content

Commit

Permalink
Enhance exception handling and expose stream response
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Oct 21, 2021
1 parent 6d18204 commit 675d382
Show file tree
Hide file tree
Showing 15 changed files with 553 additions and 231 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package com.clickhouse.client;

import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;

/**
* Base class for implementing a thread-safe ClickHouse client. It uses
* {@link ReadWriteLock} to manage access to underlying connection.
*/
public abstract class AbstractClient<T> implements ClickHouseClient {
private static final Logger log = LoggerFactory.getLogger(AbstractClient.class);

private boolean initialized;

private ExecutorService executor;
private ClickHouseConfig config;
private ClickHouseNode server;
private T connection;

protected final ReadWriteLock lock = new ReentrantReadWriteLock();

private void ensureInitialized() {
if (!initialized) {
throw new IllegalStateException("Please initialize the client first");
}
}

// just for testing purpose
final boolean isInitialized() {
return initialized;
}

/**
* Gets executor service for this client.
*
* @return executor service
*/
protected final ExecutorService getExecutor() {
lock.readLock().lock();
try {
ensureInitialized();
return executor;
} finally {
lock.readLock().unlock();
}
}

/**
* Gets current server.
*
* @return current server, could be null
* @throws IllegalStateException when the client is either closed or not
* initialized
*/
protected final ClickHouseNode getServer() {
lock.readLock().lock();
try {
ensureInitialized();
return server;
} finally {
lock.readLock().unlock();
}
}

/**
* Creates a new connection. This method will be called from
* {@link #getConnection(ClickHouseRequest)} as needed.
*
* @param config non-null configuration
* @param server non-null server
* @return new connection
* @throws CompletionException when error occured
*/
protected abstract T newConnection(ClickHouseConfig config, ClickHouseNode server);

/**
* Closes a connection. This method will be called from {@link #close()}.
*
* @param connection connection to close
* @param force whether force to close the connection or not
*/
protected abstract void closeConnection(T connection, boolean force);

/**
* Gets a connection according to the given request.
*
* @param request non-null request
* @return non-null connection
* @throws CompletionException when error occured
*/
protected final T getConnection(ClickHouseRequest<?> request) {
ClickHouseNode newNode = ClickHouseChecker.nonNull(request, "request").getServer();

lock.readLock().lock();
try {
ensureInitialized();
if (connection != null && newNode.equals(server)) {
return connection;
}
} finally {
lock.readLock().unlock();
}

lock.writeLock().lock();
try {
if (connection != null) {
log.debug("Closing connection: %s", connection);
closeConnection(connection, false);
}

server = newNode;
log.debug("Connecting to: %s", newNode);
connection = newConnection(config, server);
log.debug("Connection established: %s", connection);

return connection;
} finally {
lock.writeLock().unlock();
}
}

@Override
public final ClickHouseConfig getConfig() {
lock.readLock().lock();
try {
ensureInitialized();
return config;
} finally {
lock.readLock().unlock();
}
}

@Override
public void init(ClickHouseConfig config) {
ClickHouseChecker.nonNull(config, "config");

lock.writeLock().lock();
try {
this.config = config;
if (this.executor == null) { // only initialize once
int threads = config.getMaxThreadsPerClient();
this.executor = threads <= 0 ? ClickHouseClient.getExecutorService()
: ClickHouseUtils.newThreadPool(getClass().getSimpleName(), threads,
config.getMaxQueuedRequests());
}

initialized = true;
} finally {
lock.writeLock().unlock();
}
}

@Override
public final void close() {
lock.readLock().lock();
try {
if (!initialized) {
return;
}
} finally {
lock.readLock().unlock();
}

lock.writeLock().lock();
try {
server = null;

if (executor != null) {
executor.shutdown();
}

if (connection != null) {
closeConnection(connection, false);
}

// shutdown* won't shutdown commonPool, so awaitTermination will always time out
// on the other hand, for a client-specific thread pool, we'd better shut it
// down for real
if (executor != null && config.getMaxThreadsPerClient() > 0
&& !executor.awaitTermination(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}

executor = null;
connection = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (RuntimeException e) {
log.warn("Exception occurred when closing client", e);
} finally {
initialized = false;
try {
if (connection != null) {
closeConnection(connection, true);
}

if (executor != null) {
executor.shutdownNow();
}
} finally {
executor = null;
connection = null;
lock.writeLock().unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ static <T> CompletableFuture<T> submit(Callable<T> task) {
return (boolean) ClickHouseDefaults.ASYNC.getEffectiveDefaultValue() ? CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (CompletionException e) {
throw e;
} catch (Exception e) {
throw new CompletionException(e);
}
}, getExecutorService()) : CompletableFuture.completedFuture(task.call());
} catch (CompletionException e) {
throw e;
} catch (Exception e) {
throw new CompletionException(e);
throw new CompletionException(e.getCause() != null ? e.getCause() : e);
}
}

Expand Down Expand Up @@ -343,6 +345,11 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
list.add(resp.getSummary());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseExceptionSpecifier.specify(e, theServer);
} catch (ExecutionException e) {
throw ClickHouseExceptionSpecifier.handle(e, theServer);
}

return list;
Expand Down Expand Up @@ -375,6 +382,11 @@ static CompletableFuture<ClickHouseResponseSummary> send(ClickHouseNode server,
ClickHouseResponse resp = client.connect(theServer).format(ClickHouseFormat.RowBinary).query(sql)
.params(params).execute().get()) {
return resp.getSummary();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseExceptionSpecifier.specify(e, theServer);
} catch (ExecutionException e) {
throw ClickHouseExceptionSpecifier.handle(e, theServer);
}
});
}
Expand Down Expand Up @@ -461,6 +473,11 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
list.add(resp.getSummary());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseExceptionSpecifier.specify(e, theServer);
} catch (ExecutionException e) {
throw ClickHouseExceptionSpecifier.handle(e, theServer);
}

return list;
Expand Down Expand Up @@ -507,6 +524,11 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
list.add(resp.getSummary());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseExceptionSpecifier.specify(e, theServer);
} catch (ExecutionException e) {
throw ClickHouseExceptionSpecifier.handle(e, theServer);
}

return list;
Expand Down Expand Up @@ -556,9 +578,9 @@ default ClickHouseRequest<?> connect(Function<ClickHouseNodeSelector, ClickHouse
* object(e.g. prepare for next call using different SQL
* statement) without impacting the execution
* @return future object to get result
* @throws ClickHouseException when error occurred during execution
* @throws CompletionException when error occurred during execution
*/
CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request) throws ClickHouseException;
CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request);

/**
* Gets the immutable configuration associated with this client. In most cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@
* whole data set.
*/
public interface ClickHouseRecord extends Iterable<ClickHouseValue>, Serializable {
/**
* Gets size of the record.
*
* @return size of the record
*/
int size();

/**
* Gets deserialized value wrapped in an object using column index. Please avoid
* to cache the wrapper object, as it's reused among records for memory
Expand Down Expand Up @@ -64,4 +57,11 @@ public ClickHouseValue next() {
}
};
}

/**
* Gets size of the record.
*
* @return size of the record
*/
int size();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public enum ClickHouseClientOption implements ClickHouseConfigOption {
/**
* Maximum size of thread pool for each client.
*/
MAX_THREADS_PER_CLIENT("max_threads_per_client", 1,
MAX_THREADS_PER_CLIENT("max_threads_per_client", 0,
"Size of thread pool for each client instance, 0 or negative number means the client will use shared thread pool."),
/**
* Whether to enable retry.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.clickhouse.client.data;

import java.util.Collections;
import java.util.List;

import com.clickhouse.client.ClickHouseColumn;
Expand All @@ -12,8 +13,10 @@
* which is simply a combination of list of columns and array of values.
*/
public class ClickHouseSimpleRecord implements ClickHouseRecord {
protected final List<ClickHouseColumn> columns;
public static final ClickHouseSimpleRecord EMPTY = new ClickHouseSimpleRecord(Collections.emptyList(),
new ClickHouseValue[0]);

private final List<ClickHouseColumn> columns;
private ClickHouseValue[] values;

/**
Expand All @@ -29,6 +32,8 @@ public static ClickHouseRecord of(List<ClickHouseColumn> columns, ClickHouseValu
} else if (columns.size() != values.length) {
throw new IllegalArgumentException(ClickHouseUtils.format(
"Mismatched count: we have %d columns but we got %d values", columns.size(), values.length));
} else if (values.length == 0) {
return EMPTY;
}

return new ClickHouseSimpleRecord(columns, values);
Expand All @@ -39,6 +44,10 @@ protected ClickHouseSimpleRecord(List<ClickHouseColumn> columns, ClickHouseValue
this.values = values;
}

protected List<ClickHouseColumn> getColumns() {
return columns;
}

protected ClickHouseValue[] getValues() {
return values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,27 @@
import com.clickhouse.client.ClickHouseValues;

/**
* A simple response built on top of two lists: columns and values.
* A simple response built on top of two lists: columns and records.
*/
public class ClickHouseSimpleResponse implements ClickHouseResponse {
public static final ClickHouseSimpleResponse EMPTY = new ClickHouseSimpleResponse(Collections.emptyList(),
new ClickHouseValue[0][]);

private final List<ClickHouseColumn> columns;
private final List<ClickHouseRecord> records;

/**
* Creates a response object using columns definition and raw values.
*
* @param structure column definition
* @param values non-null raw values
* @param columns list of columns
* @param values raw values, which may or may not be null
* @return response object
*/
public static ClickHouseResponse of(String structure, Object[][] values) {
List<ClickHouseColumn> columns = ClickHouseColumn.parse(structure);
public static ClickHouseResponse of(List<ClickHouseColumn> columns, Object[][] values) {
if (columns == null || columns.isEmpty()) {
return EMPTY;
}

int size = columns.size();
int len = values != null ? values.length : 0;

Expand Down
Loading

0 comments on commit 675d382

Please sign in to comment.