Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Async Configuration #1767

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 98 additions & 59 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,9 @@
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.http.ClickHouseHttpProto;
import com.clickhouse.client.http.config.ClickHouseHttpOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHousePipedOutputStream;
import com.clickhouse.data.format.BinaryStreamUtils;
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpStatus;
Expand Down Expand Up @@ -81,6 +76,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static java.time.temporal.ChronoUnit.SECONDS;

Expand Down Expand Up @@ -116,22 +112,23 @@
public class Client implements AutoCloseable {
private HttpAPIClientHelper httpClientHelper = null;

private Set<String> endpoints;
private Map<String, String> configuration;
private List<ClickHouseNode> serverNodes = new ArrayList<>();
private Map<Class<?>, List<POJOSerializer>> serializers; //Order is important to preserve for RowBinary
private Map<Class<?>, Map<String, Method>> getterMethods;
private Map<Class<?>, Boolean> hasDefaults; // Whether the POJO has defaults
private final Set<String> endpoints;
private final Map<String, String> configuration;
private final List<ClickHouseNode> serverNodes = new ArrayList<>();
private final Map<Class<?>, List<POJOSerializer>> serializers; //Order is important to preserve for RowBinary
private final Map<Class<?>, Map<String, Method>> getterMethods;
private final Map<Class<?>, Boolean> hasDefaults; // Whether the POJO has defaults
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private ExecutorService sharedOperationExecutor;
private final ExecutorService sharedOperationExecutor;

private Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();
private final Map<String, ClientStatisticsHolder> globalClientStats = new ConcurrentHashMap<>();

private boolean useNewImplementation = false;

private ClickHouseClient oldClient = null;

private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation) {
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
ExecutorService sharedOperationExecutor) {
this.endpoints = endpoints;
this.configuration = configuration;
this.endpoints.forEach(endpoint -> {
Expand All @@ -141,7 +138,12 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
this.getterMethods = new HashMap<>();
this.hasDefaults = new HashMap<>();

this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation"));
boolean isAsyncEnabled = MapUtils.getFlag(this.configuration, ClickHouseClientOption.ASYNC.getKey());
if (isAsyncEnabled && sharedOperationExecutor == null) {
this.sharedOperationExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory("chc-operation"));
} else {
this.sharedOperationExecutor = sharedOperationExecutor;
}
this.useNewImplementation = useNewImplementation;
if (useNewImplementation) {
this.httpClientHelper = new HttpAPIClientHelper(configuration);
Expand Down Expand Up @@ -171,7 +173,7 @@ public String getDefaultDatabase() {
@Override
public void close() {
try {
if (!sharedOperationExecutor.isShutdown()) {
if (sharedOperationExecutor != null && !sharedOperationExecutor.isShutdown()) {
this.sharedOperationExecutor.shutdownNow();
}
} catch (Exception e) {
Expand All @@ -190,6 +192,8 @@ public static class Builder {
private Map<String, String> configuration;
private boolean useNewImplementation = false;

private ExecutorService sharedOperationExecutor = null;

public Builder() {
this.endpoints = new HashSet<>();
this.configuration = new HashMap<String, String>();
Expand Down Expand Up @@ -605,6 +609,34 @@ public Builder setServerTimeZone(String timeZone) {
return this;
}

/**
* Configures client to execute requests in a separate thread. By default, operations (query, insert)
* are executed in the same thread as the caller.
* It is possible to set a shared executor for all operations. See {@link #setSharedOperationExecutor(ExecutorService)}
*
* Note: Async operations a using executor what expects having a queue of tasks for a pool of executors.
* The queue size limit is small it may quickly become a problem for scheduling new tasks.
*
* @param async - if to use async requests
* @return
*/
public Builder useAsyncRequests(boolean async) {
this.configuration.put(ClickHouseClientOption.ASYNC.getKey(), String.valueOf(async));
return this;
}

/**
* Sets an executor for running operations. If async operations are enabled and no executor is specified
* client will create a default executor.
*
* @param executorService - executor service for async operations
* @return
*/
public Builder setSharedOperationExecutor(ExecutorService executorService) {
this.sharedOperationExecutor = executorService;
return this;
}

public Client build() {
this.configuration = setDefaults(this.configuration);

Expand Down Expand Up @@ -650,7 +682,7 @@ public Client build() {
throw new IllegalArgumentException("Nor server timezone nor specific timezone is set");
}

return new Client(this.endpoints, this.configuration, this.useNewImplementation);
return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor);
}

private Map<String, String> setDefaults(Map<String, String> userConfig) {
Expand Down Expand Up @@ -683,6 +715,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
userConfig.put(ClickHouseClientOption.SERVER_TIME_ZONE.getKey(), "UTC");
}

if (!userConfig.containsKey(ClickHouseClientOption.ASYNC.getKey())) {
userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false");
}

return userConfig;
}
}
Expand Down Expand Up @@ -853,7 +889,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,

settings.setOption(ClickHouseClientOption.FORMAT.getKey(), format.name());
final InsertSettings finalSettings = settings;
CompletableFuture<InsertResponse> future = CompletableFuture.supplyAsync(() -> {
Supplier<InsertResponse> supplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

Expand Down Expand Up @@ -905,8 +941,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
}
}
throw new ClientException("Failed to get table schema: too many retries");
}, sharedOperationExecutor);
return future;
};

return runAsyncOperation(supplier, settings.getAllSettings());
} else {
//Create an output stream to write the data to
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Expand Down Expand Up @@ -961,6 +998,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
ClientStatisticsHolder clientStats = globalClientStats.remove(operationId);
clientStats.start(ClientMetrics.OP_DURATION);

Supplier<InsertResponse> responseSupplier;
if (useNewImplementation) {

String retry = configuration.get(ClickHouseClientOption.RETRY.getKey());
Expand All @@ -975,7 +1013,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,

settings.setOption(ClickHouseClientOption.FORMAT.getKey(), format.name());
final InsertSettings finalSettings = settings;
CompletableFuture<InsertResponse> future = CompletableFuture.supplyAsync(() -> {
responseSupplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

Expand Down Expand Up @@ -1032,29 +1070,23 @@ public CompletableFuture<InsertResponse> insert(String tableName,
}
}
throw new ClientException("Failed to insert data: too many retries");
}, sharedOperationExecutor);
return future;
};
} else {
CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();

ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
.createMutationRequest(oldClient.write(getServerNode()), tableName, settings, configuration).format(format);

CompletableFuture<ClickHouseResponse> future = null;
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(request.getConfig())) {
future = request.data(stream.getInputStream()).execute();

//Copy the data from the input stream to the output stream
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
int bytesRead;
while ((bytesRead = data.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
responseFuture.completeExceptionally(new ClientException("Failed to write data to the output stream", e));
}
responseSupplier = () -> {
ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
.createMutationRequest(oldClient.write(getServerNode()), tableName, settings, configuration).format(format);

CompletableFuture<ClickHouseResponse> future = null;
future = request.data(output -> {
//Copy the data from the input stream to the output stream
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
int bytesRead;
while ((bytesRead = data.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
output.close();
}).option(ClickHouseClientOption.ASYNC, false).execute();

if (!responseFuture.isCompletedExceptionally()) {
try {
int operationTimeout = getOperationTimeout();
ClickHouseResponse clickHouseResponse;
Expand All @@ -1064,17 +1096,17 @@ public CompletableFuture<InsertResponse> insert(String tableName,
clickHouseResponse = future.get();
}
InsertResponse response = new InsertResponse(clickHouseResponse, clientStats);
responseFuture.complete(response);
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
return response;
} catch (ExecutionException e) {
responseFuture.completeExceptionally(new ClientException("Failed to get insert response", e.getCause()));
throw new ClientException("Failed to get insert response", e.getCause());
} catch (InterruptedException | TimeoutException e) {
responseFuture.completeExceptionally(new ClientException("Operation has likely timed out.", e));
throw new ClientException("Operation has likely timed out.", e);
}
}
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));

return responseFuture;
};
}

return runAsyncOperation(responseSupplier, settings.getAllSettings());
}

/**
Expand Down Expand Up @@ -1140,6 +1172,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
clientStats.start(ClientMetrics.OP_DURATION);
applyDefaults(settings);

Supplier<QueryResponse> responseSupplier;

if (useNewImplementation) {
String retry = configuration.get(ClickHouseClientOption.RETRY.getKey());
final int maxRetries = retry == null ? (int) ClickHouseClientOption.RETRY.getDefaultValue() : Integer.parseInt(retry);
Expand All @@ -1148,7 +1182,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
settings.setOption("statement_params", queryParams);
}
final QuerySettings finalSettings = settings;
CompletableFuture<QueryResponse> future = CompletableFuture.supplyAsync(() -> {
responseSupplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();
for (int i = 0; i <= maxRetries; i++) {
Expand Down Expand Up @@ -1183,8 +1217,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
}
}
throw new ClientException("Failed to get table schema: too many retries");
}, sharedOperationExecutor);
return future;
};
} else {
ClickHouseRequest<?> request = oldClient.read(getServerNode());
request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
Expand All @@ -1195,7 +1228,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
request.format(format);

final QuerySettings finalSettings = settings;
CompletableFuture<QueryResponse> future = CompletableFuture.supplyAsync(() -> {
responseSupplier = () -> {
LOG.trace("Executing request: {}", request);
try {

Expand All @@ -1213,9 +1246,10 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
} catch (Exception e) {
throw new ClientException("Failed to get query response", e);
}
}, sharedOperationExecutor);
return future;
};
}

return runAsyncOperation(responseSupplier, settings.getAllSettings());
}

/**
Expand Down Expand Up @@ -1247,13 +1281,13 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
settings.waitEndOfQuery(true); // we rely on the summery

final QuerySettings finalSettings = settings;
return query(sqlQuery, settings).thenApplyAsync(response -> {
return query(sqlQuery, settings).thenApply(response -> {
try {
return new Records(response, finalSettings);
} catch (Exception e) {
throw new ClientException("Failed to get query response", e);
}
}, sharedOperationExecutor);
});
}

/**
Expand Down Expand Up @@ -1355,13 +1389,13 @@ public CompletableFuture<CommandResponse> execute(String sql, CommandSettings se
*/
public CompletableFuture<CommandResponse> execute(String sql) {
return query(sql)
.thenApplyAsync(response -> {
.thenApply(response -> {
try {
return new CommandResponse(response);
} catch (Exception e) {
throw new ClientException("Failed to get command response", e);
}
}, sharedOperationExecutor);
});
}

/**
Expand Down Expand Up @@ -1422,6 +1456,11 @@ private void applyDefaults(QuerySettings settings) {
}
}

private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
boolean isAsync = MapUtils.getFlag(configuration, requestSettings, ClickHouseClientOption.ASYNC.getKey());
return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get());
}

public String toString() {
return "Client{" +
"endpoints=" + endpoints +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,28 @@ public static boolean getFlag(Map<String, String> map, String key) {
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
}

public static boolean getFlag(Map<String, String> p1, Map<String, String> p2, String key) {
String val = p1.get(key);
public static boolean getFlag(Map<String, ?> p1, Map<String, ?> p2, String key) {
Object val = p1.get(key);
if (val == null) {
val = p2.get(key);
}
if (val == null) {
throw new NullPointerException("Missing value for the key '" + key + "'");
}
if (val.equalsIgnoreCase("true")) {
return true;
} else if (val.equalsIgnoreCase("false")) {
return false;
}

throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
if (val instanceof Boolean) {
return (Boolean) val;
} else if (val instanceof String) {
String str = (String) val;
if (str.equalsIgnoreCase("true")) {
return true;
} else if (str.equalsIgnoreCase("false")) {
return false;
} else {
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
}
} else {
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
}
}
}
Loading