Skip to content

Commit

Permalink
Make HttpUrlConnection default implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Dec 15, 2021
1 parent d9e9df8 commit 1f6666a
Show file tree
Hide file tree
Showing 22 changed files with 355 additions and 99 deletions.
2 changes: 1 addition & 1 deletion clickhouse-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

<properties>
<clickhouse4j-driver.version>1.4.4</clickhouse4j-driver.version>
<native-driver.version>2.6.1</native-driver.version>
<native-driver.version>2.6.3</native-driver.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jmh.version>1.33</jmh.version>
<shade.name>benchmarks</shade.name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ public enum JdbcDriver {
"jdbc:clickhouse://%s:%s/%s?ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&compress=%s",
Constants.HTTP_PORT),
// ClickHouse JDBC Driver
ClickhouseHttpJdbc("com.clickhouse.jdbc.ClickHouseDriver",
"jdbc:ch://%s:%s/%s?ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&compress=%s",
ClickhouseHttpJdbc1("com.clickhouse.jdbc.ClickHouseDriver",
"jdbc:ch://%s:%s/%s?http_connection_provider=HTTP_URL_CONNECTION&ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&compress=%s",
Constants.HTTP_PORT),
ClickhouseHttpJdbc2("com.clickhouse.jdbc.ClickHouseDriver",
"jdbc:ch://%s:%s/%s?http_connection_provider=HTTP_CLIENT&ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&compress=%s",
Constants.HTTP_PORT),
ClickhouseGrpcJdbc("com.clickhouse.jdbc.ClickHouseDriver",
"jdbc:ch:grpc://%s:%s/%s?ssl=false&user=%s&password=%s&use_server_time_zone=false&use_time_zone=UTC&max_inbound_message_size=2147483647&compress=%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ClickHouseClientBuilder {
}

defaultExecutor = ClickHouseUtils.newThreadPool(ClickHouseClient.class.getSimpleName(), maxThreads,
maxThreads * 2, maxRequests, keepAliveTimeoutMs);
maxThreads * 2, maxRequests, keepAliveTimeoutMs, false);
}

protected ClickHouseConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;

/**
* A parameterized query is a parsed query with parameters being extracted for
* substitution.
* <p>
* Here parameter is define in the format of {@code :<name>[(<type>)]}. It
* starts with colon, followed by name, and then optionally type within
* brackets. For example: in query "select :no as no, :name(String) as name",
* both {@code no} and {@code name} are parameters. Moreover, type of the last
* parameter is {@code String}.
* starts with colon, immediately followed by name, and then optionally type
* within brackets. For example: in query "select :no as no, :name(String) as
* name", we have two parameters: {@code no} and {@code name}. Moreover, type of
* the last parameter is {@code String}.
*/
public class ClickHouseParameterizedQuery implements Serializable {
private static final long serialVersionUID = 8108887349618342152L;
Expand Down Expand Up @@ -271,6 +273,19 @@ protected String parse() {
return partIndex < len ? originalQuery.substring(partIndex, len) : null;
}

/**
* Appends last part of the query if it exists.
*
* @param builder non-null string builder
* @return the builder
*/
protected StringBuilder appendLastPartIfExists(StringBuilder builder) {
if (lastPart != null) {
builder.append(lastPart);
}
return builder;
}

/**
* Converts given raw value to SQL expression.
*
Expand Down Expand Up @@ -305,10 +320,7 @@ public String apply(Map<String, String> params) {
builder.append(params.getOrDefault(p.paramName, ClickHouseValues.NULL_EXPR));
}

if (lastPart != null) {
builder.append(lastPart);
}
return builder.toString();
return appendLastPartIfExists(builder).toString();
}

/**
Expand All @@ -318,23 +330,25 @@ public String apply(Map<String, String> params) {
* @return substituted query
*/
public String apply(Collection<String> params) {
if (!hasParameter()) {
return originalQuery;
}

StringBuilder builder = new StringBuilder();
Iterator<String> it = params == null ? null : params.iterator();
boolean hasMore = it != null && it.hasNext();
for (QueryPart p : parts) {
builder.append(p.part);
builder.append(hasMore ? it.next() : ClickHouseValues.NULL_EXPR);
hasMore = hasMore && it.hasNext();
}

if (lastPart != null) {
builder.append(lastPart);
if (params == null || params.isEmpty()) {
return apply(Collections.emptyMap());
}

Map<String, String> map = null;
Iterator<String> it = params.iterator();
if (it.hasNext()) {
map = new HashMap<>();
for (String n : names.keySet()) {
String v = it.next();
if (v != null) {
map.put(n, v);
}
if (!it.hasNext()) {
break;
}
}
}
return builder.toString();
return apply(map);
}

/**
Expand All @@ -351,22 +365,24 @@ public String apply(Object param, Object... more) {
return originalQuery;
}

int len = more == null ? 0 : more.length + 1;
StringBuilder builder = new StringBuilder();
int index = 0;
for (QueryPart p : parts) {
builder.append(p.part);
if (index > 0) {
param = index < len ? more[index - 1] : null;
int len = more == null ? 0 : more.length;
Map<String, String> map = new HashMap<>();
int index = -1;
for (Entry<String, ClickHouseValue> e : names.entrySet()) {
ClickHouseValue v = e.getValue();
if (index < 0) {
map.put(e.getKey(),
v != null ? v.update(param).toSqlExpression() : ClickHouseValues.convertToSqlExpression(param));
} else if (index < len) {
map.put(e.getKey(), v != null ? v.update(more[index]).toSqlExpression()
: ClickHouseValues.convertToSqlExpression(more[index]));
} else {
break;
}
builder.append(toSqlExpression(p.paramName, param));
index++;
}

if (lastPart != null) {
builder.append(lastPart);
}
return builder.toString();
return apply(map);
}

/**
Expand All @@ -378,24 +394,25 @@ public String apply(Object param, Object... more) {
* @return substituted query
*/
public String apply(Object[] values) {
if (!hasParameter()) {
return originalQuery;
int len = values == null ? 0 : values.length;
if (len == 0) {
return apply(Collections.emptyMap());
}

int len = values == null ? 0 : values.length;
StringBuilder builder = new StringBuilder();
Map<String, String> map = new HashMap<>();
int index = 0;
for (QueryPart p : parts) {
builder.append(p.part);
builder.append(
index < len ? toSqlExpression(p.paramName, values[index]) : ClickHouseValues.NULL_EXPR);
for (Entry<String, ClickHouseValue> e : names.entrySet()) {
ClickHouseValue v = e.getValue();
if (index < len) {
map.put(e.getKey(), v != null ? v.update(values[index]).toSqlExpression()
: ClickHouseValues.convertToSqlExpression(values[index]));
} else {
break;
}
index++;
}

if (lastPart != null) {
builder.append(lastPart);
}
return builder.toString();
return apply(map);
}

/**
Expand All @@ -410,22 +427,21 @@ public String apply(String param, String... more) {
return originalQuery;
}

int len = more == null ? 0 : more.length + 1;
StringBuilder builder = new StringBuilder();
int index = 0;
for (QueryPart p : parts) {
builder.append(p.part);
if (index > 0) {
param = index < len ? more[index - 1] : ClickHouseValues.NULL_EXPR;
int len = more == null ? 0 : more.length;
Map<String, String> map = new HashMap<>();
int index = -1;
for (String n : names.keySet()) {
if (index < 0) {
map.put(n, param);
} else if (index < len) {
map.put(n, more[index]);
} else {
break;
}
builder.append(param);
index++;
}

if (lastPart != null) {
builder.append(lastPart);
}
return builder.toString();
return apply(map);
}

/**
Expand All @@ -435,23 +451,23 @@ public String apply(String param, String... more) {
* @return substituted query
*/
public String apply(String[] values) {
if (!hasParameter()) {
return originalQuery;
int len = values == null ? 0 : values.length;
if (len == 0) {
return apply(Collections.emptyMap());
}

int len = values == null ? 0 : values.length;
StringBuilder builder = new StringBuilder();
Map<String, String> map = new HashMap<>();
int index = 0;
for (QueryPart p : parts) {
builder.append(p.part);
builder.append(index < len ? values[index] : ClickHouseValues.NULL_EXPR);
for (String n : names.keySet()) {
if (index < len) {
map.put(n, values[index]);
} else {
break;
}
index++;
}

if (lastPart != null) {
builder.append(lastPart);
}
return builder.toString();
return apply(map);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ClickHouseThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber;

public ClickHouseThreadFactory(Object owner) {
this(owner, false, Thread.NORM_PRIORITY);
this(owner, true, Thread.NORM_PRIORITY);
}

public ClickHouseThreadFactory(Object owner, boolean daemon, int priority) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ private static <T> T findFirstService(Class<? extends T> serviceInterface) {
}

public static ExecutorService newThreadPool(Object owner, int maxThreads, int maxRequests) {
return newThreadPool(owner, maxThreads, 0, maxRequests, 0L);
return newThreadPool(owner, maxThreads, 0, maxRequests, 0L, true);
}

public static ExecutorService newThreadPool(Object owner, int coreThreads, int maxThreads, int maxRequests,
long keepAliveTimeoutMs) {
long keepAliveTimeoutMs, boolean allowCoreThreadTimeout) {
BlockingQueue<Runnable> queue = maxRequests > 0 ? new ArrayBlockingQueue<>(maxRequests)
: new LinkedBlockingQueue<>();
if (coreThreads < 2) {
Expand All @@ -120,8 +120,16 @@ public static ExecutorService newThreadPool(Object owner, int coreThreads, int m
if (maxThreads < coreThreads) {
maxThreads = coreThreads;
}
return new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeoutMs < 0L ? 0L : keepAliveTimeoutMs,
if (keepAliveTimeoutMs <= 0L) {
keepAliveTimeoutMs = allowCoreThreadTimeout ? 1000L : 0L;
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeoutMs,
TimeUnit.MILLISECONDS, queue, new ClickHouseThreadFactory(owner), new ThreadPoolExecutor.AbortPolicy());
if (allowCoreThreadTimeout) {
pool.allowCoreThreadTimeOut(true);
}
return pool;
}

public static boolean isCloseBracket(char ch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,11 @@ public boolean hasNext() {

@Override
public ClickHouseRecord next() {
if (!hasNext()) {
ClickHouseRecord r = readNextRow();
if (r == null) {
throw new NoSuchElementException("No more record");
}

return readNextRow();
return r;
}
}

Expand Down
Loading

0 comments on commit 1f6666a

Please sign in to comment.