Skip to content

Commit

Permalink
fix for protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
rernas35 committed Jul 16, 2022
1 parent ebde4af commit 6791f9b
Show file tree
Hide file tree
Showing 32 changed files with 124 additions and 332 deletions.

This file was deleted.

This file was deleted.

30 changes: 0 additions & 30 deletions clickhouse-r2dbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
<artifactId>clickhouse-r2dbc</artifactId>

<properties>
<revision>0.3.3-SNAPSHOT</revision>
<hikari-cp.version>4.0.3</hikari-cp.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Expand All @@ -22,21 +21,8 @@
<testcontainers.clickhouse.version>1.16.3</testcontainers.clickhouse.version>
<docker-java-api.version>3.2.13</docker-java-api.version>
<docker-java-transport-zerodep.version>3.2.13</docker-java-transport-zerodep.version>
<surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.16</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down Expand Up @@ -115,21 +101,5 @@
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<includes>
<include>%regex[.*Test*.*]</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.config.ClickHouseDefaults;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
Expand All @@ -13,7 +14,7 @@

public class ClickHouseBatch implements Batch {

private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes;
private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.TabSeparatedWithNamesAndTypes;
private ClickHouseRequest<?> request;
List<String> sqlList = new ArrayList<>();

Expand All @@ -31,7 +32,8 @@ public Batch add(String sql) {
public Publisher<? extends Result> execute() {
return Flux.fromStream(sqlList.stream().map(sql -> {
request.query(sql).format(PREFERRED_FORMAT);
return Mono.fromFuture(request.execute());
}).map(ClickHouseResult::new));
return Mono.fromFuture(request::execute); }))
.flatMap(Mono::flux)
.map(ClickHouseResult::new);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.r2dbc.types.TypeMapper;
import com.clickhouse.r2dbc.types.ClickHouseDataTypeWrapper;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Type;

Expand All @@ -12,7 +12,7 @@ public class ClickHouseColumnMetadata implements ColumnMetadata {

ClickHouseColumnMetadata(ClickHouseColumn col) {
this.name = col.getColumnName(); // TODO :check alias handling.
this.type = TypeMapper.getType(col.getDataType());
this.type = ClickHouseDataTypeWrapper.of(col.getDataType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public class ClickHouseResult implements Result {
private final Mono<? extends Result.Segment> updatedCount;
private final Flux<? extends Result.Segment> segments;

ClickHouseResult(Mono<ClickHouseResponse> response) {
this.rowSegments = response.flux()
.flatMap(resp -> Flux
ClickHouseResult(ClickHouseResponse response) {
this.rowSegments = Mono.just(response)
.flatMapMany(resp -> Flux
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
.map(rec -> Pair.of(resp.getColumns(), rec))))
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
.map(RowSegment::new);
this.updatedCount = response.map(ClickHouseResponse::getSummary)
this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
.map(ClickHouseResponseSummary::getProgress)
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
.map(UpdateCount::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ClickHouseRowMetadata implements RowMetadata {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;
import io.r2dbc.spi.Blob;
Expand All @@ -16,14 +17,13 @@

import java.util.List;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;
import java.util.stream.Stream;

public class ClickHouseStatement implements Statement {

private static final Logger log = LoggerFactory.getLogger(ClickHouseStatement.class);

private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes;
private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.TabSeparatedWithNamesAndTypes;
private static final String NULL_VALUES_ARE_NOT_ALLOWED_AS_VALUE = "null values are not allowed as value.";
private static final String CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE = "class types are not allowed as value.";
private static final String INVALID_PARAMETER_INDEX = "Invalid parameter index! Parameter index must be greater than 0.";
Expand All @@ -42,7 +42,9 @@ public class ClickHouseStatement implements Statement {
private int fetchSize;

public ClickHouseStatement(String sql, ClickHouseRequest<?> request) {
this.request = request.format(PREFERRED_FORMAT).query(sql);
this.request = request
.format(PREFERRED_FORMAT)
.query(sql);
namedParameters = request.getPreparedQuery().getParameters();
bindings = new ClickHouseStatementBinding(namedParameters.size());
}
Expand Down Expand Up @@ -133,9 +135,8 @@ public Flux<? extends Result> execute() {
request.option(ClickHouseClientOption.MAX_RESULT_ROWS, fetchSize);
}
if (boundList.isEmpty()) {
return Flux.from(Mono.fromFuture(request::execute))
.map(Mono::just)
.map(ClickHouseResult::new);
return Flux.from(Mono.fromFuture(request::execute)
.map(ClickHouseResult::new));
} else {
Stream<Mono<ClickHouseResponse>> monoStream = boundList.stream().map(binding -> {
for (int i = 0; i < binding.values.length; i++ ) {
Expand All @@ -146,7 +147,9 @@ public Flux<? extends Result> execute() {
request.params(binding.values);
return Mono.fromFuture(request::execute);
});
return Flux.fromStream(monoStream).map(ClickHouseResult::new);
return Flux.fromStream(monoStream)
.flatMap(Mono::flux)
.map(ClickHouseResult::new);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.time.Duration;

import static reactor.core.publisher.Mono.just;

public class ClickHouseConnection implements Connection {

private static final Logger log = LoggerFactory.getLogger(ClickHouseConnection.class);
Expand All @@ -30,8 +32,8 @@ public class ClickHouseConnection implements Connection {
final ClickHouseNode server;
private boolean closed = false;

ClickHouseConnection(ClickHouseNode server, ClickHouseProtocol preferredProtocol) {
this.client = ClickHouseClient.newInstance(preferredProtocol);
ClickHouseConnection(ClickHouseNode server) {
this.client = ClickHouseClient.newInstance(server.getProtocol());
this.server = server;
}

Expand Down Expand Up @@ -81,6 +83,9 @@ public Publisher<Void> commitTransaction() {
@Override
public Batch createBatch() {
ClickHouseRequest<?> req = client.connect(server);
if (isHttp()) {
req = req.set("send_progress_in_http_headers", 1);
}
req.option(ClickHouseClientOption.ASYNC, true);
return new ClickHouseBatch(req);
}
Expand All @@ -97,10 +102,17 @@ public Publisher<Void> createSavepoint(String s) {
@Override
public Statement createStatement(String sql) {
ClickHouseRequest<?> req = client.connect(server);
if (isHttp()) {
req = req.set("send_progress_in_http_headers", 1);
}
req.option(ClickHouseClientOption.ASYNC, true);
return new ClickHouseStatement(sql, req);
}

private boolean isHttp() {
return server.getProtocol() == ClickHouseProtocol.HTTP;
}

/**
* Returns true since there is no transaction support.
* @return true
Expand Down Expand Up @@ -175,9 +187,9 @@ public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
@Override
public Publisher<Boolean> validate(ValidationDepth validationDepth) {
if (validationDepth == ValidationDepth.REMOTE) {
return Mono.just(client.ping(server, DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK));
return closed ? just(false) : just(client.ping(server, DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK));
} else { // validationDepth.LOCAL
return Mono.just(client != null && !closed);
return just(client != null && !closed);
}
}
}
Loading

0 comments on commit 6791f9b

Please sign in to comment.