Skip to content

Commit

Permalink
Merge pull request #1124 from zhicwu/develop
Browse files Browse the repository at this point in the history
Remove deprecated code and refactor data processor
  • Loading branch information
zhicwu authored Nov 11, 2022
2 parents 5d9bcb9 + e74edf8 commit 5373ed7
Show file tree
Hide file tree
Showing 110 changed files with 8,669 additions and 2,499 deletions.
4 changes: 4 additions & 0 deletions clickhouse-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void selectInt8(Blackhole blackhole, DriverState state) throws Throwable
public void selectUInt8(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getShort(i)));
ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getByte(i)));
int l = 0;
try (Statement stmt = executeQuery(state, "select toUInt8(number % 256) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
Expand Down Expand Up @@ -167,6 +167,23 @@ public void selectInt32(Blackhole blackhole, DriverState state) throws Throwable
}
}

@Benchmark
public void selectUInt32(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
int rows = state.getSampleSize() + num;
ConsumeValueFunction func = state.getConsumeFunction((b, r, l, i) -> b.consume(r.getInt(i)));
int l = 0;
try (Statement stmt = executeQuery(state, "select toUInt32(number) as v from numbers(?)", rows)) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
func.consume(blackhole, rs, l++, 1);
}
}
if (l != rows) {
throw new IllegalStateException(String.format(Locale.ROOT, "Expected %d rows but got %d", rows, l));
}
}

@Benchmark
public void selectString(Blackhole blackhole, DriverState state) throws Throwable {
int num = state.getRandomNumber();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.clickhouse.benchmark.misc;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import com.clickhouse.benchmark.BaseState;
import com.clickhouse.client.ClickHouseByteBuffer;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseDataStreamFactory;
import com.clickhouse.client.ClickHouseInputStream;
import com.clickhouse.client.ClickHouseOutputStream;
import com.clickhouse.client.ClickHousePipedOutputStream;
import com.clickhouse.client.config.ClickHouseBufferingMode;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseOption;
import com.clickhouse.client.data.BinaryStreamUtils;

@State(Scope.Benchmark)
@Warmup(iterations = 10, timeUnit = TimeUnit.SECONDS, time = 1)
@Measurement(iterations = 10, timeUnit = TimeUnit.SECONDS, time = 1)
@Fork(value = 2)
@Threads(value = -1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public class QueueBenchmark {
@State(Scope.Thread)
public static class CompareState extends BaseState {
public long samples;

@Setup(Level.Trial)
public void setupSamples() {
samples = 5000000L;
}

@Setup(Level.Iteration)
public void initValueClass() {
// ignore
}
}

@Benchmark
public void ideal(CompareState state, Blackhole consumer) {
long range = state.samples;
byte[] bytes = new byte[8];
ClickHouseByteBuffer buffer = ClickHouseByteBuffer.of(bytes);
for (long i = 0L; i < range; i++) {
BinaryStreamUtils.setInt64(bytes, 0, i);
consumer.consume(buffer.asLong());
buffer.update(bytes); // reset read position
}
}

@Benchmark
public void blocking(CompareState state, Blackhole consumer) throws Exception {
Map<ClickHouseOption, Serializable> options = new HashMap<>();
// options.put(ClickHouseClientOption.BUFFER_QUEUE_VARIATION, 0);
// options.put(ClickHouseClientOption.MAX_QUEUED_BUFFERS, 0);
// options.put(ClickHouseClientOption.SOCKET_TIMEOUT, 0);
options.put(ClickHouseClientOption.USE_BLOCKING_QUEUE, false);
final ClickHouseConfig config = new ClickHouseConfig(options);
final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(
config, null);
CompletableFuture<Long> future = ClickHouseClient.submit(() -> {
long range = state.samples;
try (ClickHouseOutputStream out = stream) {
for (long i = 0L; i < range; i++) {
BinaryStreamUtils.writeInt64(out, i);
}
}
return range;
});

try (ClickHouseInputStream input = stream.getInputStream()) {
consumer.consume(BinaryStreamUtils.readInt64(input));
}

consumer.consume(future.get());
}

@Benchmark
public void nonBlocking(CompareState state, Blackhole consumer) throws Exception {
final ClickHouseConfig config = new ClickHouseConfig(Collections
.singletonMap(ClickHouseClientOption.RESPONSE_BUFFERING, ClickHouseBufferingMode.PERFORMANCE));
final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(
config, null);
CompletableFuture<Long> future = ClickHouseClient.submit(() -> {
long range = state.samples;
try (ClickHouseOutputStream out = stream) {
for (long i = 0L; i < range; i++) {
BinaryStreamUtils.writeInt64(out, i);
}
}
return range;
});

try (ClickHouseInputStream input = stream.getInputStream()) {
consumer.consume(BinaryStreamUtils.readInt64(input));
}

consumer.consume(future.get());
}
}
20 changes: 20 additions & 0 deletions clickhouse-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,36 @@
<artifactId>dnsjava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public final ClickHouseConfig getConfig() {

@Override
public void init(ClickHouseConfig config) {
ClickHouseChecker.nonNull(config, "config");
ClickHouseChecker.nonNull(config, ClickHouseConfig.TYPE_NAME);

lock.writeLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ default ClickHouseArraySequence allocate(int length, Class<?> clazz) {
* Gets value at the specified position in this array.
*
* @param <V> type of the value
* @param index index which is greater or equal to zero and always smaller
* than {@link #length()}
* @param index index which is greater than or equal to zero and it's always
* smaller than {@link #length()}
* @param value non-null template object to retrieve the value
* @return non-null value which is same as {@code value}
*/
Expand All @@ -59,8 +59,8 @@ default ClickHouseArraySequence allocate(int length, Class<?> clazz) {
/**
* Sets value to the specified position in this array.
*
* @param index index which is greater or equal to zero and always smaller
* than {@link #length()}
* @param index index which is greater than or equal to zero and it's always
* smaller than {@link #length()}
* @param value non-null container of the value
* @return this value
*/
Expand Down
Loading

0 comments on commit 5373ed7

Please sign in to comment.