Skip to content

Commit

Permalink
Merge pull request #995 from zhicwu/develop
Browse files Browse the repository at this point in the history
Fix slowness in performance mode and failover not working when protocol is unsupported
  • Loading branch information
zhicwu authored Jul 15, 2022
2 parents 9b21161 + c010b4d commit db0bd80
Show file tree
Hide file tree
Showing 18 changed files with 464 additions and 239 deletions.
10 changes: 5 additions & 5 deletions clickhouse-cli-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ This is a thin wrapper of ClickHouse native command-line client. It provides an
- native CLI client instead of pure Java implementation
- an example of implementing SPI defined in `clickhouse-client` module

Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [docker](https://docs.docker.com/get-docker/) must be installed prior to use. And it's important to understand that this module uses sub-process(in addition to threads) and file-based streaming, meaning 1) it's not as fast as native CLI client or pure Java implementation, although it's close in the case of dumping and loading data; and 2) it's not suitable for scenarios like dealing with many queries in short period of time.
Either [clickhouse](https://clickhouse.com/docs/en/interfaces/cli/) or [docker](https://docs.docker.com/get-docker/) must be installed prior to use. And it's important to understand that this module uses sub-process(in addition to threads) and file-based streaming, meaning 1) it's not as fast as native CLI client or pure Java implementation, although it's close in the case of dumping and loading data; and 2) it's not suitable for scenarios like dealing with many queries in short period of time.

## Limitations and Known Issues

- Only `max_result_rows` and `result_overflow_mode` two settings are currently supported
- Only `max_result_rows`, `result_overflow_mode` and `readonly` 3 settings are currently supported
- ClickHouseResponseSummary is always empty - see ClickHouse/ClickHouse#37241
- Session is not supported - see ClickHouse/ClickHouse#37308

Expand All @@ -28,10 +28,10 @@ Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [d
## Examples

```java
// make sure 'clickhouse-client' or 'docker' is in PATH before you start the program
// make sure 'clickhouse' or 'docker' is in PATH before you start the program
// alternatively, configure CLI path in either Java system property or environment variable, for examples:
// CHC_CLICKHOUSE_CLI_PATH=/path/to/clickhouse-client CHC_DOCKER_CLI_PATH=/path/to/docker java MyProgram
// java -Dchc_clickhouse_cli_path=/path/to/clickhouse-client -Dchc_docker_cli_path=/path/to/docker MyProgram
// CHC_CLICKHOUSE_CLI_PATH=/path/to/clickhouse CHC_DOCKER_CLI_PATH=/path/to/docker java MyProgram
// java -Dchc_clickhouse_cli_path=/path/to/clickhouse -Dchc_docker_cli_path=/path/to/docker MyProgram

// clickhouse-cli-client uses TCP protocol
ClickHouseProtocol preferredProtocol = ClickHouseProtocol.TCP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -93,7 +94,7 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
cli = DEFAULT_DOCKER_CLI_PATH;
}
if (!check(timeout, cli, DEFAULT_CLI_ARG_VERSION)) {
throw new IllegalStateException("Docker command-line is not available: " + cli);
throw new UncheckedIOException(new ConnectException("Docker command-line is not available: " + cli));
} else {
commands.add(cli);
}
Expand All @@ -111,7 +112,7 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
DEFAULT_CLI_ARG_VERSION)
&& !check(timeout, cli, "run", "--rm", "--name", str, "-v", hostDir + ':' + containerDir,
"-d", img, "tail", "-f", "/dev/null")) {
throw new IllegalStateException("Failed to start new container: " + str);
throw new UncheckedIOException(new ConnectException("Failed to start new container: " + str));
}
}
}
Expand All @@ -122,7 +123,7 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
} else { // create new container for each query
if (!check(timeout, cli, "run", "--rm", img, DEFAULT_CLICKHOUSE_CLI_PATH, DEFAULT_CLIENT_OPTION,
DEFAULT_CLI_ARG_VERSION)) {
throw new IllegalStateException("Invalid ClickHouse docker image: " + img);
throw new UncheckedIOException(new ConnectException("Invalid ClickHouse docker image: " + img));
}
commands.add("run");
commands.add("--rm");
Expand Down Expand Up @@ -235,6 +236,10 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
if (value != null) {
commands.add("--result_overflow_mode=".concat(value.toString()));
}
value = settings.get("readonly");
if (value != null) {
commands.add("--readonly=".concat(value.toString()));
}
if ((boolean) config.getOption(ClickHouseCommandLineOption.USE_PROFILE_EVENTS)) {
commands.add("--print-profile-events");
commands.add("--profile-events-delay-ms=-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
public enum ClickHouseCommandLineOption implements ClickHouseOption {
/**
* ClickHouse native command-line client path. Empty value is treated as
* 'clickhouse-client'.
* 'clickhouse'.
*/
CLICKHOUSE_CLI_PATH("clickhouse_cli_path", "",
"ClickHouse native command-line client path, empty value is treated as 'clickhouse-client'"),
"ClickHouse native command-line client path, empty value is treated as 'clickhouse'"),
/**
* ClickHouse docker image. Empty value is treated as
* 'clickhouse/clickhouse-server'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ static ClickHouseInputStream getResponseInputStream(ClickHouseConfig config, Inp

/**
* Gets piped input stream for reading data from response asynchronously. When
* {@code config} is null or {@code config.isAsync()} is faluse, this method is
* {@code config} is null or {@code config.isAsync()} is false, this method is
* same as
* {@link #getResponseInputStream(ClickHouseConfig, InputStream, Runnable)}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.clickhouse.client;

import java.io.Serializable;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -27,25 +30,36 @@
*/
public class ClickHouseClientBuilder {
/**
* Dummy client which is only used {@link Agent}.
* Dummy client which is only used by {@link Agent}.
*/
static class DummyClient implements ClickHouseClient {
static final ClickHouseConfig CONFIG = new ClickHouseConfig();
static final DummyClient INSTANCE = new DummyClient();
static final ClickHouseConfig DEFAULT_CONFIG = new ClickHouseConfig();

private final ClickHouseConfig config;

DummyClient() {
this(null);
}

DummyClient(ClickHouseConfig config) {
this.config = config != null ? config : DEFAULT_CONFIG;
}

@Override
public boolean accept(ClickHouseProtocol protocol) {
return true;
return false;
}

@Override
public CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request) {
return CompletableFuture.completedFuture(ClickHouseResponse.EMPTY);
CompletableFuture<ClickHouseResponse> future = new CompletableFuture<>();
future.completeExceptionally(new ConnectException("No client available"));
return future;
}

@Override
public ClickHouseConfig getConfig() {
return CONFIG;
return config;
}

@Override
Expand All @@ -55,7 +69,7 @@ public void close() {

@Override
public boolean ping(ClickHouseNode server, int timeout) {
return true;
return false;
}
}

Expand All @@ -68,8 +82,8 @@ static final class Agent implements ClickHouseClient {

private final AtomicReference<ClickHouseClient> client;

Agent(ClickHouseClient client) {
this.client = new AtomicReference<>(client != null ? client : DummyClient.INSTANCE);
Agent(ClickHouseClient client, ClickHouseConfig config) {
this.client = new AtomicReference<>(client != null ? client : new DummyClient(config));
}

ClickHouseClient getClient() {
Expand All @@ -90,25 +104,27 @@ boolean changeClient(ClickHouseClient currentClient, ClickHouseClient newClient)
return changed;
}

ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, Throwable cause, int times) {
ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, int times) {
for (int i = 1; i <= times; i++) {
log.debug("Failover %d of %d due to: %s", i, times, cause.getMessage());
log.debug("Failover %d of %d due to: %s", i, times, exception.getCause(), null);
ClickHouseNode current = sealedRequest.getServer();
ClickHouseNodeManager manager = current.manager.get();
if (manager == null) {
break;
}
ClickHouseNode next = manager.suggestNode(current, cause);
ClickHouseNode next = manager.suggestNode(current, exception);
if (next == current) {
log.debug("Cancel failover for same node returned from %s", manager.getPolicy());
break;
}
current.update(Status.FAULTY);
next = sealedRequest.changeServer(current, next);
if (next == current) {
log.debug("Cancel failover for no alternative of %s", current);
break;
}

log.info("Switching node from %s to %s due to: %s", current, next, cause.getMessage());
log.info("Switching node from %s to %s due to: %s", current, next, exception.getCause(), null);
final ClickHouseProtocol protocol = next.getProtocol();
final ClickHouseClient currentClient = client.get();
if (!currentClient.accept(protocol)) {
Expand All @@ -118,64 +134,70 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, Throwable cause,
.config(new ClickHouseConfig(currentClient.getConfig(), next.config))
.nodeSelector(ClickHouseNodeSelector.of(protocol)).build();
} catch (Exception e) {
cause = e;
continue;
exception = ClickHouseException.of(new ConnectException("No client available for " + next),
sealedRequest.getServer());
} finally {
if (newClient != null) {
boolean changed = changeClient(currentClient, newClient);
log.debug("Switching client from %s to %s: %s", currentClient, newClient, changed);
log.info("Switching client from %s to %s: %s", currentClient, newClient, changed);
if (changed) {
sealedRequest.resetCache();
}
}
}

if (newClient == null) {
continue;
}
}

try {
return sendOnce(sealedRequest);
} catch (Exception exp) {
cause = exp.getCause();
if (cause == null) {
cause = exp;
}
exception = ClickHouseException.of(exp.getCause() != null ? exp.getCause() : exp,
sealedRequest.getServer());
}
}

throw new CompletionException(cause);
throw new CompletionException(exception);
}

ClickHouseResponse retry(ClickHouseRequest<?> sealedRequest, Throwable cause, int times) {
ClickHouseResponse retry(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, int times) {
for (int i = 1; i <= times; i++) {
log.debug("Retry %d of %d due to: %s", i, times, cause.getMessage());
log.debug("Retry %d of %d due to: %s", i, times, exception.getMessage());
// TODO retry idempotent query
if (cause instanceof ClickHouseException
&& ((ClickHouseException) cause).getErrorCode() == ClickHouseException.ERROR_NETWORK) {
if (exception.getErrorCode() == ClickHouseException.ERROR_NETWORK) {
log.info("Retry request on %s due to connection issue", sealedRequest.getServer());
try {
return sendOnce(sealedRequest);
} catch (Exception exp) {
cause = exp.getCause();
if (cause == null) {
cause = exp;
}
exception = ClickHouseException.of(exp.getCause() != null ? exp.getCause() : exp,
sealedRequest.getServer());
}
}
}

throw new CompletionException(cause);
throw new CompletionException(exception);
}

ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {
// in case there's any recoverable exception wrapped by UncheckedIOException
if (cause instanceof UncheckedIOException && cause.getCause() != null) {
cause = ((UncheckedIOException) cause).getCause();
}

log.debug("Handling %s(failover=%d, retry=%d)", cause, sealedRequest.getConfig().getFailover(),
sealedRequest.getConfig().getRetry());
try {
int times = sealedRequest.getConfig().getFailover();
if (times > 0) {
return failover(sealedRequest, cause, times);
return failover(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
}

// different from failover: 1) retry on the same node; 2) never retry on timeout
times = sealedRequest.getConfig().getRetry();
if (times > 0) {
return retry(sealedRequest, cause, times);
return retry(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
}

throw new CompletionException(cause);
Expand All @@ -200,8 +222,8 @@ ClickHouseResponse sendOnce(ClickHouseRequest<?> sealedRequest) {
ClickHouseResponse send(ClickHouseRequest<?> sealedRequest) {
try {
return sendOnce(sealedRequest);
} catch (CompletionException e) {
return handle(sealedRequest, e.getCause());
} catch (Exception e) {
return handle(sealedRequest, e.getCause() != null ? e.getCause() : e);
}
}

Expand All @@ -228,9 +250,32 @@ public boolean ping(ClickHouseNode server, int timeout) {
@Override
public CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request) {
final ClickHouseRequest<?> sealedRequest = request.seal();
final ClickHouseNode server = sealedRequest.getServer();
final ClickHouseProtocol protocol = server.getProtocol();
final ClickHouseClient currentClient = client.get();
if (!currentClient.accept(protocol)) {
ClickHouseClient newClient = null;
try {
newClient = ClickHouseClient.builder().agent(false)
.config(new ClickHouseConfig(currentClient.getConfig(), server.config))
.nodeSelector(ClickHouseNodeSelector.of(protocol)).build();
} catch (IllegalStateException e) {
// let it fail on execution phase
log.debug("Failed to find client for %s", server);
} finally {
if (newClient != null) {
boolean changed = changeClient(currentClient, newClient);
log.debug("Switching client from %s to %s: %s", currentClient, newClient, changed);
if (changed) {
sealedRequest.resetCache();
}
}
}
}
return sealedRequest.getConfig().isAsync()
? getClient().execute(sealedRequest)
.handle((r, t) -> t == null ? r : handle(sealedRequest, t.getCause()))
.handle((r, t) -> t == null ? r
: handle(sealedRequest, t.getCause() != null ? t.getCause() : t))
: CompletableFuture.completedFuture(send(sealedRequest));
}

Expand Down Expand Up @@ -339,26 +384,28 @@ public ClickHouseConfig getConfig() {
public ClickHouseClient build() {
ClickHouseClient client = null;

boolean noSelector = nodeSelector == null || nodeSelector == ClickHouseNodeSelector.EMPTY;
int counter = 0;
ClickHouseConfig conf = getConfig();
for (ClickHouseClient c : loadClients()) {
c.init(conf);
int counter = 0;
if (nodeSelector != null) {
for (ClickHouseClient c : loadClients()) {
c.init(conf);

counter++;
if (noSelector || nodeSelector.match(c)) {
client = c;
break;
counter++;
if (nodeSelector == ClickHouseNodeSelector.EMPTY || nodeSelector.match(c)) {
client = c;
break;
}
}
}

if (client == null) {
if (agent) {
return new Agent(client, conf);
} else if (client == null) {
throw new IllegalStateException(
ClickHouseUtils.format("No suitable ClickHouse client(out of %d) found in classpath for %s.",
counter, nodeSelector));
}

return agent ? new Agent(client) : client;
return client;
}

/**
Expand Down Expand Up @@ -475,7 +522,11 @@ public ClickHouseClientBuilder defaultCredentials(ClickHouseCredentials credenti
*/
public ClickHouseClientBuilder nodeSelector(ClickHouseNodeSelector nodeSelector) {
if (!ClickHouseChecker.nonNull(nodeSelector, "nodeSelector").equals(this.nodeSelector)) {
this.nodeSelector = nodeSelector;
this.nodeSelector = (nodeSelector.getPreferredProtocols().isEmpty() || nodeSelector.getPreferredProtocols()
.equals(Collections.singletonList(ClickHouseProtocol.ANY)))
&& nodeSelector.getPreferredTags().isEmpty()
? ClickHouseNodeSelector.EMPTY
: nodeSelector;
resetConfig();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ public String toString() {
.append(checking.get()).append(", index=").append(index.get()).append(", lock=r")
.append(lock.getReadHoldCount()).append('w').append(lock.getWriteHoldCount()).append(", nodes=")
.append(nodes.size()).append(", faulty=").append(faultyNodes.size()).append(", policy=")
.append(policy.getClass().getSimpleName()).append(']').toString();
.append(policy.getClass().getSimpleName()).append("]@").append(hashCode()).toString();
}
}
Loading

0 comments on commit db0bd80

Please sign in to comment.