From d9a6f24aa0a84161d5233387daa735abb37538ed Mon Sep 17 00:00:00 2001 From: rernas35 Date: Mon, 28 Feb 2022 21:54:53 +0100 Subject: [PATCH] first version of r2dbc --- clickhouse-r2dbc/pom.xml | 132 ++ .../com/clickhouse/r2dbc/ClickHouseBatch.java | 37 + .../r2dbc/ClickHouseColumnMetadata.java | 27 + .../clickhouse/r2dbc/ClickHouseResult.java | 124 ++ .../com/clickhouse/r2dbc/ClickHouseRow.java | 142 ++ .../r2dbc/ClickHouseRowMetadata.java | 43 + .../clickhouse/r2dbc/ClickHouseStatement.java | 140 ++ .../r2dbc/ClickHouseStatementBinding.java | 60 + .../connection/ClickHouseConnection.java | 188 +++ .../ClickHouseConnectionFactory.java | 42 + .../ClickHouseConnectionFactoryMetadata.java | 13 + .../ClickHouseConnectionFactoryProvider.java | 41 + .../ClickHouseConnectionMetadata.java | 51 + .../r2dbc/types/ClickHouseType.java | 52 + .../clickhouse/r2dbc/types/TypeMapper.java | 53 + .../io.r2dbc.spi.ConnectionFactoryProvider | 1 + .../r2dbc/spit/test/R2DBCTestKitImpl.java | 316 ++++ .../config.d/docker_related_config.xml | 12 + .../clickhouse-docker-mount/config.xml | 1294 +++++++++++++++++ .../docker_related_config.xml | 12 + .../clickhouse-docker-mount/users.xml | 123 ++ pom.xml | 1 + 22 files changed, 2904 insertions(+) create mode 100644 clickhouse-r2dbc/pom.xml create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseBatch.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseColumnMetadata.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRow.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRowMetadata.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatement.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatementBinding.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnection.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactory.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryMetadata.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionMetadata.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/ClickHouseType.java create mode 100644 clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/TypeMapper.java create mode 100644 clickhouse-r2dbc/src/main/resources/META-INF/services/io.r2dbc.spi.ConnectionFactoryProvider create mode 100644 clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spit/test/R2DBCTestKitImpl.java create mode 100644 clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.d/docker_related_config.xml create mode 100644 clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.xml create mode 100644 clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/docker_related_config.xml create mode 100644 clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/users.xml diff --git a/clickhouse-r2dbc/pom.xml b/clickhouse-r2dbc/pom.xml new file mode 100644 index 000000000..13594795d --- /dev/null +++ b/clickhouse-r2dbc/pom.xml @@ -0,0 +1,132 @@ + + + + clickhouse-java + com.clickhouse + 0.3.2-SNAPSHOT + + 4.0.0 + + clickhouse-r2dbc + + + 4.0.3 + 1.8 + 1.8 + 0.9.1.RELEASE + 3.12.0 + 1.16.3 + 1.16.3 + 3.2.13 + 3.2.13 + 3.0.0-M5 + + + + + + io.projectreactor + reactor-bom + 2020.0.16 + pom + import + + + + + + + io.projectreactor + reactor-core + + + io.r2dbc + r2dbc-spi + ${r2dbc-spi.version} + + + io.r2dbc + r2dbc-spi-test + ${r2dbc-spi.version} + + + com.clickhouse + clickhouse-client + ${project.version} + + + com.clickhouse + clickhouse-grpc-client + ${project.version} + + + com.clickhouse + clickhouse-http-client + ${project.version} + + + com.clickhouse + clickhouse-jdbc + ${project.version} + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + org.testcontainers + elasticsearch + ${testcontainers.elasticsearch.version} + test + + + org.testcontainers + clickhouse + ${testcontainers.clickhouse.version} + test + + + com.github.docker-java + docker-java-api + ${docker-java-api.version} + test + + + com.github.docker-java + docker-java-transport-zerodep + ${docker-java-transport-zerodep.version} + test + + + com.zaxxer + HikariCP + ${hikari-cp.version} + test + + + org.slf4j + slf4j-api + + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire-plugin.version} + + + %regex[.*Test*.*] + + + + + + \ No newline at end of file diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseBatch.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseBatch.java new file mode 100644 index 000000000..068efcf38 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseBatch.java @@ -0,0 +1,37 @@ +package com.clickhouse.r2dbc; + +import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseRequest; +import io.r2dbc.spi.Batch; +import io.r2dbc.spi.Result; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; + +public class ClickHouseBatch implements Batch { + + private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes; + private ClickHouseRequest request; + List sqlList = new ArrayList<>(); + + public ClickHouseBatch(ClickHouseRequest request) { + this.request = request; + } + + @Override + public Batch add(String sql) { + sqlList.add(sql); + return this; + } + + @Override + public Publisher execute() { + return Flux.fromStream(sqlList.stream().map(sql -> { + request.query(sql).format(PREFERRED_FORMAT); + return Mono.fromFuture(request.execute()); + }).map(ClickHouseResult::new)); + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseColumnMetadata.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseColumnMetadata.java new file mode 100644 index 000000000..59408657c --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseColumnMetadata.java @@ -0,0 +1,27 @@ +package com.clickhouse.r2dbc; + +import com.clickhouse.client.ClickHouseColumn; +import com.clickhouse.r2dbc.types.TypeMapper; +import io.r2dbc.spi.ColumnMetadata; +import io.r2dbc.spi.Type; + +public class ClickHouseColumnMetadata implements ColumnMetadata { + + final Type type; + final String name; + + ClickHouseColumnMetadata(ClickHouseColumn col) { + this.name = col.getColumnName(); // TODO :check alias handling. + this.type = TypeMapper.getType(col.getDataType()); + } + + @Override + public Type getType() { + return type; + } + + @Override + public String getName() { + return name; + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java new file mode 100644 index 000000000..5cb3b4a1b --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseResult.java @@ -0,0 +1,124 @@ +package com.clickhouse.r2dbc; + +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.ClickHouseResponseSummary; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Row; +import io.r2dbc.spi.RowMetadata; +import org.apache.commons.lang3.tuple.Pair; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.StreamSupport; + +public class ClickHouseResult implements Result { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class); + + private final Flux rowSegments; + private final Mono updatedCount; + private final Flux segments; + + ClickHouseResult(Mono response) { + this.rowSegments = response.flux() + .flatMap(resp -> Flux + .fromStream(StreamSupport.stream(resp.records().spliterator(), false) + .map(rec -> Pair.of(resp.getColumns(), rec)))) + .map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft())) + .map(RowSegment::new); + this.updatedCount = response.map(clickHouseResponse -> clickHouseResponse.getSummary()) + .map(ClickHouseResponseSummary::getProgress) + .map(ClickHouseResponseSummary.Progress::getWrittenRows) + .map(UpdateCount::new); + this.segments = Flux.concat(this.updatedCount, this.rowSegments); + } + + ClickHouseResult(Flux rowSegments, Mono updatedCount) { + this.rowSegments = rowSegments; + this.updatedCount = updatedCount; + this.segments = Flux.concat(this.updatedCount, this.rowSegments); + } + + /** + * Returns updated count(written rows from summary of {@link ClickHouseResponse}).Important! if writtenRows is greater than MAX_INT then it will return MAX_INT. + * @return + */ + @Override + public Mono getRowsUpdated() { + + return updatedCount.map(val -> { + UpdateCount updateCount = (UpdateCount) val; + if (updateCount.value() > Integer.MAX_VALUE) + return Integer.MAX_VALUE; + else return Long.valueOf(updateCount.value()).intValue(); + }); + } + + @Override + public Publisher map(BiFunction biFunction) { + return rowSegments.cast(RowSegment.class) + .map(RowSegment::row).handle((row, sink) -> { + try { + sink.next(biFunction.apply(row, row.getMetadata())); + } catch (Exception e) { + log.error("Provided function caused exception:", e); + } + }); + } + + @Override + public Result filter(Predicate predicate) { + return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate)); + } + + @Override + public Publisher flatMap(Function> function) { + return segments.flatMap(segment -> { + try { + Publisher retValue = function.apply(segment); + if (retValue == null) { + return Mono.error(new IllegalStateException("flatmap function returned null value")); + } + return retValue; + } catch (Exception e) { + log.error("Provided function caused exception:", e); + return Mono.error(e); + } + }); + } + + + class RowSegment implements Result.RowSegment { + + final ClickHouseRow row; + + RowSegment(ClickHouseRow row) { + this.row = row; + } + + @Override + public Row row() { + return row; + } + } + + class UpdateCount implements Result.UpdateCount { + + final long updateCount; + + UpdateCount(long updateCount) { + this.updateCount = updateCount; + } + + @Override + public long value() { + return updateCount; + } + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRow.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRow.java new file mode 100644 index 000000000..aade0abe8 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRow.java @@ -0,0 +1,142 @@ +package com.clickhouse.r2dbc; + +import com.clickhouse.client.ClickHouseColumn; +import com.clickhouse.client.ClickHouseRecord; +import io.r2dbc.spi.Row; +import io.r2dbc.spi.RowMetadata; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ClickHouseRow implements Row { + + final ClickHouseRecord record; + final ClickHouseRowMetadata rowMetadata; + + ClickHouseRow(ClickHouseRecord record, List columnList) { + this.record = record; + this.rowMetadata = new ClickHouseRowMetadata(columnList.stream() + .map(ClickHouseColumnMetadata::new) + .collect(Collectors + .toMap(ClickHouseColumnMetadata::getName, + Function.identity(), + (v1,v2) -> v2, // since every key will be unique, won't need to merge so just overwrite with the latest one. + LinkedHashMap::new))); + } + + @Override + public RowMetadata getMetadata() { + return rowMetadata; + } + + @Override + public T get(int i, Class aClass) { + if (aClass.equals(Boolean.class)) { + return aClass.cast(record.getValue(i).asBoolean()); + } else if (aClass.equals(Short.class)) { + return aClass.cast(record.getValue(i).asShort()); + } else if (aClass.equals(Integer.class)) { + return aClass.cast(record.getValue(i).asInteger()); + } else if (aClass.equals(Long.class)) { + return aClass.cast(record.getValue(i).asLong()); + } else if (aClass.equals(Double.class)) { + return aClass.cast(record.getValue(i).asDouble()); + } else if (aClass.equals(Float.class)) { + return aClass.cast(record.getValue(i).asFloat()); + } else if (aClass.equals(BigDecimal.class)) { + return aClass.cast(record.getValue(i).asBigDecimal()); + } else if (aClass.equals(BigInteger.class)) { + return aClass.cast(record.getValue(i).asBigInteger()); + } else if (aClass.equals(LocalDate.class)) { + return aClass.cast(record.getValue(i).asDate()); + } else if (aClass.equals(LocalDateTime.class)) { + return aClass.cast(record.getValue(i).asDateTime()); + } else if (aClass.equals(Instant.class)) { + return aClass.cast(record.getValue(i).asInstant()); + } else if (aClass.equals(LocalTime.class)) { + return aClass.cast(record.getValue(i).asTime()); + } else if (aClass.equals(byte[].class)) { + return aClass.cast(record.getValue(i).asBinary()); + } else if (aClass.equals(Object[].class)) { + return aClass.cast(record.getValue(i).asArray()); + } else if (aClass.equals(Inet4Address.class)) { + return aClass.cast(record.getValue(i).asInet4Address()); + } else if (aClass.equals(Inet6Address.class)) { + return aClass.cast(record.getValue(i).asInet6Address()); + } else if (aClass.equals(Map.class)) { + return aClass.cast(record.getValue(i).asMap()); + } else if (aClass.equals(UUID.class)) { + return aClass.cast(record.getValue(i).asUuid()); + } else if (aClass.equals(List.class)) { + return aClass.cast(record.getValue(i).asTuple()); + } else if (aClass.equals(String.class)) { + return aClass.cast(record.getValue(i).asString()); + } else { + return aClass.cast(record.getValue(i).asObject()); + } + } + + @Override + public T get(String name, Class aClass) { + if (aClass.equals(Boolean.class)) { + return aClass.cast(record.getValue(name).asBoolean()); + } else if (aClass.equals(Short.class)) { + return aClass.cast(record.getValue(name).asShort()); + } else if (aClass.equals(Integer.class)) { + return aClass.cast(record.getValue(name).asInteger()); + } else if (aClass.equals(Long.class)) { + return aClass.cast(record.getValue(name).asLong()); + } else if (aClass.equals(Double.class)) { + return aClass.cast(record.getValue(name).asDouble()); + } else if (aClass.equals(Float.class)) { + return aClass.cast(record.getValue(name).asFloat()); + } else if (aClass.equals(BigDecimal.class)) { + return aClass.cast(record.getValue(name).asBigDecimal()); + } else if (aClass.equals(BigInteger.class)) { + return aClass.cast(record.getValue(name).asBigInteger()); + } else if (aClass.equals(LocalDate.class)) { + return aClass.cast(record.getValue(name).asDate()); + } else if (aClass.equals(LocalDateTime.class)) { + return aClass.cast(record.getValue(name).asDateTime()); + } else if (aClass.equals(Instant.class)) { + return aClass.cast(record.getValue(name).asInstant()); + } else if (aClass.equals(LocalTime.class)) { + return aClass.cast(record.getValue(name).asTime()); + } else if (aClass.equals(byte[].class)) { + return aClass.cast(record.getValue(name).asBinary()); + } else if (aClass.equals(Object[].class)) { + return aClass.cast(record.getValue(name).asArray()); + } else if (aClass.equals(Inet4Address.class)) { + return aClass.cast(record.getValue(name).asInet4Address()); + } else if (aClass.equals(Inet6Address.class)) { + return aClass.cast(record.getValue(name).asInet6Address()); + } else if (aClass.equals(Map.class)) { + return aClass.cast(record.getValue(name).asMap()); + } else if (aClass.equals(UUID.class)) { + return aClass.cast(record.getValue(name).asUuid()); + } else if (aClass.equals(List.class)) { + return aClass.cast(record.getValue(name).asTuple()); + } else if (aClass.equals(String.class)) { + return aClass.cast(record.getValue(name).asString()); + } else { + try { + return aClass.cast(record.getValue(name).asObject()); + } catch (IllegalArgumentException e) { + throw new NoSuchElementException(String.format("Unknown element with a name %s", name)); + } + } + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRowMetadata.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRowMetadata.java new file mode 100644 index 000000000..52a4bb785 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseRowMetadata.java @@ -0,0 +1,43 @@ +package com.clickhouse.r2dbc; + +import io.r2dbc.spi.ColumnMetadata; +import io.r2dbc.spi.RowMetadata; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ClickHouseRowMetadata implements RowMetadata { + + LinkedHashMap columnNameMetadataMap; + + ClickHouseRowMetadata( LinkedHashMap columnNameMetadataMap) { + this.columnNameMetadataMap = columnNameMetadataMap; + } + + @Override + public ColumnMetadata getColumnMetadata(int i) { + if (i > columnNameMetadataMap.size()) + throw new IllegalArgumentException("Given index is greater than size column metadata array."); + return columnNameMetadataMap.entrySet().stream().skip(i-1).findFirst().get().getValue(); + } + + @Override + public ColumnMetadata getColumnMetadata(String columnName) { + return columnNameMetadataMap.get(columnName); + } + + @Override + public List getColumnMetadatas() { + return Collections.unmodifiableList(columnNameMetadataMap.entrySet().stream() + .map(Map.Entry::getValue).collect(Collectors.toList())); + } + + @Override + public Collection getColumnNames() { + return Collections.unmodifiableSet(columnNameMetadataMap.keySet()); + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatement.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatement.java new file mode 100644 index 000000000..931ad40b0 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatement.java @@ -0,0 +1,140 @@ +package com.clickhouse.r2dbc; + +import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import io.r2dbc.spi.Blob; +import io.r2dbc.spi.Clob; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Statement; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Stream; + +public class ClickHouseStatement implements Statement { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseStatement.class); + + private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes; + private static final String NULL_VALUES_ARE_NOT_ALLOWED_AS_VALUE = "null values are not allowed as value."; + private static final String CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE = "class types are not allowed as value."; + private static final String INVALID_IDENTIFIER_INDEX_IDENTIFIER_INDEX_MUST_BE_GREATER_THAN_0 = "Invalid identifier index! identifier index must be greater than 0."; + + private static final Object EXPLICITLY_SET_NULL_VALUE = new Object(); + + private final ClickHouseRequest request; + private final List namedParameters; + private ClickHouseStatementBinding bindings; + private int fetchSize; + + public ClickHouseStatement(String sql, ClickHouseRequest request) { + this.request = request.format(PREFERRED_FORMAT).query(sql); + namedParameters = request.getPreparedQuery().getParameters(); + bindings = new ClickHouseStatementBinding(namedParameters.size()); + } + + @Override + public Statement add() { + bindings.add(); + return this; + } + + @Override + public Statement bind(int identifierIndex, Object o) { + if (o == null) { + throw new IllegalArgumentException(NULL_VALUES_ARE_NOT_ALLOWED_AS_VALUE); + } else if (o instanceof Class) { + throw new IllegalArgumentException(CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE); + } + if (identifierIndex < 0) { + throw new IllegalArgumentException(INVALID_IDENTIFIER_INDEX_IDENTIFIER_INDEX_MUST_BE_GREATER_THAN_0); + } + + bindings.addBinding(identifierIndex, safeValue(o)); + return this; + } + + private Object safeValue(Object o) { + if (o instanceof Blob) { + throw new IllegalArgumentException("Unsupported datatype: Blob"); + } else if (o instanceof Clob) { + throw new IllegalArgumentException("Unsupported datatype: Clob"); + } + return o; + } + + @Override + public Statement bind(String identifierName, Object o) { + if (o == null) { + throw new IllegalArgumentException(NULL_VALUES_ARE_NOT_ALLOWED_AS_VALUE); + } else if (o instanceof Class) { + throw new IllegalArgumentException(CLASS_TYPES_ARE_NOT_ALLOWED_AS_VALUE); + } + int index = namedParameters.indexOf(identifierName); + if (index < 0) { + throw new NoSuchElementException(String.format("non-existing identifier : %s", identifierName)); + } + bindings.addBinding(index, safeValue(o)); + return this; + } + + @Override + public Statement bindNull(int identifierIndex, Class aClass) { + if (identifierIndex < 0) { + throw new IllegalArgumentException(INVALID_IDENTIFIER_INDEX_IDENTIFIER_INDEX_MUST_BE_GREATER_THAN_0); + } + bindings.addBinding(identifierIndex, EXPLICITLY_SET_NULL_VALUE); + return this; + } + + @Override + public Statement bindNull(String identifierName, Class aClass) { + if (identifierName == null) { + throw new IllegalArgumentException("null values are not allowed as identifier name."); + } + bindings.addBinding(namedParameters.indexOf(identifierName), EXPLICITLY_SET_NULL_VALUE); + return this; + } + + @Override + public Statement fetchSize(int rows) { + this.fetchSize = rows; + return this; + } + + @Override + public Flux execute() { + List boundList = bindings.getBoundList(); + if (fetchSize > 0) { + log.debug("setting fetch size {}", fetchSize); + request.option(ClickHouseClientOption.MAX_RESULT_ROWS, fetchSize); + } + if (boundList.isEmpty()) { + return Flux.from(Mono.fromFuture(request::execute)) + .map(Mono::just) + .map(ClickHouseResult::new); + } else { + Stream> monoStream = boundList.stream().map(binding -> { + for (int i = 0; i < binding.values.length; i++ ) { + if (binding.values[i] == EXPLICITLY_SET_NULL_VALUE) { + binding.values[i] = null; + } + } + request.params(binding.values); + return Mono.fromFuture(request::execute); + }); + return Flux.fromStream(monoStream).map(ClickHouseResult::new); + } + } + + @Override + public Statement returnGeneratedValues(String... columns) { + throw new UnsupportedOperationException("Generated values can not be returned from Clickhouse database."); + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatementBinding.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatementBinding.java new file mode 100644 index 000000000..aa418255b --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/ClickHouseStatementBinding.java @@ -0,0 +1,60 @@ +package com.clickhouse.r2dbc; + +import java.util.ArrayList; +import java.util.List; + +class ClickHouseStatementBinding { + List bindingList; + Binding current; + int size; + + ClickHouseStatementBinding(int size) { + this.size = size; + current = new Binding(size); + bindingList = new ArrayList<>(); + } + + void addBinding(int index, Object value) { + current.setParam(index, value); + } + + void add() { + if (current.isCompleted()) { + bindingList.add(current); + current = new Binding(size); + } + + } + + List getBoundList() { + List bindingList = (this.bindingList == null) ? new ArrayList<>() : this.bindingList; + if (current.values.length > 0 && current.isCompleted()) + bindingList.add(current); + return bindingList; + } + + public class Binding { + + Object[] values; + + private Binding(int size) { + values = new Object[size]; + } + + private void setParam(int index, Object value) { + values[index] = value; + } + + private boolean isCompleted(){ + for (Object value: values) { + if (value == null) throw new IllegalStateException("Not all identifiers are set."); + } + return true; + } + + public Object[] getValues() { + return values; + } + } +} + diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnection.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnection.java new file mode 100644 index 000000000..ae713a810 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnection.java @@ -0,0 +1,188 @@ +package com.clickhouse.r2dbc.connection; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import com.clickhouse.r2dbc.ClickHouseBatch; +import com.clickhouse.r2dbc.ClickHouseStatement; +import io.r2dbc.spi.Batch; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionMetadata; +import io.r2dbc.spi.IsolationLevel; +import io.r2dbc.spi.Statement; +import io.r2dbc.spi.TransactionDefinition; +import io.r2dbc.spi.ValidationDepth; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +public class ClickHouseConnection implements Connection { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseConnection.class); + + public static final int DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK = 5000; + final ClickHouseClient client; + final ClickHouseNode server; + private boolean closed = false; + + ClickHouseConnection(ClickHouseClient client, ClickHouseNode server) { + this.client = client; + this.server = server; + } + + + /** + * Transactions are not supported so this is a no-op implementation, + * @return + */ + @Override + public Mono beginTransaction() { + log.debug("Clickhouse does not support transactions so skipping initialization of transaction."); + return Mono.empty(); + } + + /** + * Transactions are not supported so this is a no-op implementation, + * @return + */ + @Override + public Mono beginTransaction(TransactionDefinition transactionDefinition) { + log.debug("Clickhouse does not support transactions so skipping initialization of transaction."); + return Mono.empty(); + } + + @Override + public Publisher close() { + try { + client.close(); + closed = true; + return Mono.empty(); + } catch (Exception e) { + return Mono.error(e); + } + } + + /** + * Transactions are not supported so this is a no-op implementation, + * @return + */ + @Override + public Publisher commitTransaction() { + log.debug("Clickhouse does not support transactions so skipping commit of transaction."); + return Mono.empty(); + } + + /** + * Returns {@link ClickHouseBatch} for batching statements. + * @return + */ + @Override + public Batch createBatch() { + ClickHouseRequest req = client.connect(server); + req.option(ClickHouseClientOption.ASYNC, true); + return new ClickHouseBatch(req); + } + + /** + * Returns true since there is no transaction support. + * @return true + */ + @Override + public Publisher createSavepoint(String s) { + return Mono.empty(); + } + + @Override + public Statement createStatement(String sql) { + ClickHouseRequest req = client.connect(server); + req.option(ClickHouseClientOption.ASYNC, true); + return new ClickHouseStatement(sql, req); + } + + /** + * Returns true since there is no transaction support. + * @return true + */ + @Override + public boolean isAutoCommit() { + return true; + } + + + @Override + public ConnectionMetadata getMetadata() { + return new ClickHouseConnectionMetadata(client, server); + } + + /** + * + * @return + */ + @Override + public IsolationLevel getTransactionIsolationLevel() { + return IsolationLevel.READ_COMMITTED; + } + + @Override + public Publisher releaseSavepoint(String s) { + return null; + } + + /** + * Transactions are not supported so this is a no-op implementation, + * @return + */ + @Override + public Publisher rollbackTransaction() { + log.debug("Clickhouse does not support transactions so skipping rollback of transaction."); + return Mono.empty(); + } + + @Override + public Publisher rollbackTransactionToSavepoint(String s) { + return null; + } + + /** + * Transactions are not supported so this is a no-op implementation, + * @return + */ + @Override + public Publisher setAutoCommit(boolean b) { + log.debug("Clickhouse does not support transactions so skipping setting of transaction auto commit."); + return Mono.empty(); + } + + @Override + public Publisher setLockWaitTimeout(Duration duration) { + return null; + } + + @Override + public Publisher setStatementTimeout(Duration duration) { + return null; + } + + /** + * Since transactions are not supported, this method will throw exception. + * @param isolationLevel + * @return + */ + @Override + public Mono setTransactionIsolationLevel(IsolationLevel isolationLevel) { + return Mono.error(new UnsupportedOperationException("Transaction isolation level can not be changed.")); + } + + @Override + public Publisher validate(ValidationDepth validationDepth) { + if (validationDepth == ValidationDepth.REMOTE) { + return Mono.just(client.ping(server, DEFAULT_TIMEOUT_FOR_CONNECTION_HEALTH_CHECK)); + } else { // validationDepth.LOCAL + return Mono.just(client != null && !closed); + } + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactory.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactory.java new file mode 100644 index 000000000..50a0552b3 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactory.java @@ -0,0 +1,42 @@ +package com.clickhouse.r2dbc.connection; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryMetadata; +import reactor.core.publisher.Mono; + +public class ClickHouseConnectionFactory implements ConnectionFactory { + + Mono client; + + private ClickHouseConnectionFactory(Mono client) { + this.client = client; + } + + public static ClickHouseConnectionFactory from(ClickHouseNode.Builder builder, String protocol) { + ClickHouseNode server = builder.build(); + ClickHouseProtocol preferredProtocol = null; + if (protocol.equalsIgnoreCase("grpc")) { + preferredProtocol = ClickHouseProtocol.GRPC; + } else if (protocol.equalsIgnoreCase("http")) { + preferredProtocol = ClickHouseProtocol.HTTP; + } else { + throw new IllegalArgumentException("Undefined protocol"); + } + ClickHouseClient client = ClickHouseClient.newInstance(preferredProtocol); + return new ClickHouseConnectionFactory(Mono.just(new ClickHouseConnection(client, server))); + } + + @Override + public Mono create() { + return client; + } + + @Override + public ConnectionFactoryMetadata getMetadata() { + return ClickHouseConnectionFactoryMetadata.INSTANCE; + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryMetadata.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryMetadata.java new file mode 100644 index 000000000..c8835a0c1 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryMetadata.java @@ -0,0 +1,13 @@ +package com.clickhouse.r2dbc.connection; + +import io.r2dbc.spi.ConnectionFactoryMetadata; + +public class ClickHouseConnectionFactoryMetadata implements ConnectionFactoryMetadata { + + static final ClickHouseConnectionFactoryMetadata INSTANCE = new ClickHouseConnectionFactoryMetadata(); + + @Override + public String getName() { + return "ClickHouse"; + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java new file mode 100644 index 000000000..d8222d922 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java @@ -0,0 +1,41 @@ +package com.clickhouse.r2dbc.connection; + +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseNode; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryOptions; + +import static io.r2dbc.spi.ConnectionFactoryOptions.*; + +public class ClickHouseConnectionFactoryProvider implements io.r2dbc.spi.ConnectionFactoryProvider { + + /** + * The name of the driver used for discovery, should not be changed. + */ + public static final String CLICKHOUSE_DRIVER = "clickhouse"; + + @Override + public ConnectionFactory create(ConnectionFactoryOptions cfOpt) { + ClickHouseNode.Builder nodeBuilder = ClickHouseNode.builder(); + nodeBuilder.host(cfOpt.getValue(HOST).toString()); + nodeBuilder.port((Integer) cfOpt.getValue(PORT)); + if (cfOpt.getValue(USER) != null) { + nodeBuilder.credentials(ClickHouseCredentials.fromUserAndPassword(cfOpt.getValue(USER).toString(), + cfOpt.getValue(PASSWORD).toString())); + } + return ClickHouseConnectionFactory.from(nodeBuilder, cfOpt.getValue(PROTOCOL).toString()); + } + + @Override + public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) { + if (connectionFactoryOptions.getValue(DRIVER).equals(CLICKHOUSE_DRIVER)) { + return true; + } + return false; + } + + @Override + public String getDriver() { + return CLICKHOUSE_DRIVER; + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionMetadata.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionMetadata.java new file mode 100644 index 000000000..354398179 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionMetadata.java @@ -0,0 +1,51 @@ +package com.clickhouse.r2dbc.connection; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import io.r2dbc.spi.ConnectionMetadata; + +import java.util.concurrent.CompletableFuture; + +public class ClickHouseConnectionMetadata implements ConnectionMetadata { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseConnectionMetadata.class); + + final ClickHouseClient client; + final ClickHouseNode server; + + private String serverVersion = null; + + ClickHouseConnectionMetadata(ClickHouseClient client, ClickHouseNode server) { + this.client = client; + this.server = server; + } + + @Override + public String getDatabaseProductName() { + return "Clickhouse"; + } + + /** + * Blocking operation. Queries server version by calling "SELECT version()" statement. + * @return server version + */ + @Override + public String getDatabaseVersion() { + if (serverVersion != null) { + return serverVersion; + } + CompletableFuture responseCF = client.connect(server).query("SELECT version()").execute(); + // blocking here + ClickHouseResponse resp = null; + try { + resp = responseCF.get(); + } catch (Exception e) { + log.error("While fetching server version, error occured.", e); + return null; + } + return resp.records().iterator().next().getValue(0).asString(); + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/ClickHouseType.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/ClickHouseType.java new file mode 100644 index 000000000..0e3b80214 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/ClickHouseType.java @@ -0,0 +1,52 @@ +package com.clickhouse.r2dbc.types; + +import io.r2dbc.spi.Type; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.UUID; + + +public enum ClickHouseType implements Type { + BOOLEAN_TYPE(Boolean.class, "Boolean"), + SHORT_TYPE(Short.class, "Short"), + INTEGER_TYPE(Integer.class, "Integer"), + LONG_TYPE(Long.class, "Integer"), + BIGINTEGER_TYPE(BigInteger.class, "BigInteger"), + DOUBLE_TYPE(Double.class, "Double"), + BIGDECIMAL_TYPE(BigDecimal.class, "BigDecimal"), + FLOAT_TYPE(Float.class, "Float"), + STRING_TYPE(String.class, "String"), + LOCALDATE_TYPE(LocalDate.class, "LocalDate"), + LOCALDATETIME_TYPE(LocalDateTime.class, "LocalDateTime"), + UUID_TYPE(UUID.class, "UUID"), + IPV4_TYPE(Inet4Address.class, "IPV4"), + IPV6_TYPE(Inet6Address.class, "IPV4"), + MAP_TYPE(Map.class, "Map"), + TUPLE_TYPE(List.class, "Tuple"), + BLOB(Byte[].class, "Byte Array"); + + ClickHouseType(Class javaType, String name) { + this.javaType = javaType; + this.name = name; + } + + Class javaType; + String name; + + @Override + public Class getJavaType() { + return javaType; + } + + @Override + public String getName() { + return name; + } +} diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/TypeMapper.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/TypeMapper.java new file mode 100644 index 000000000..2ca44e6d1 --- /dev/null +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/types/TypeMapper.java @@ -0,0 +1,53 @@ +package com.clickhouse.r2dbc.types; + +import com.clickhouse.client.ClickHouseDataType; +import io.r2dbc.spi.Type; + +public class TypeMapper { + + public static Type getType(ClickHouseDataType dataType) { + switch (dataType) { + case UInt8: return ClickHouseType.SHORT_TYPE; + case UInt16: + case Int8: + case Int16: + case Int32: return ClickHouseType.INTEGER_TYPE; + case IntervalSecond: + case IntervalMinute: + case IntervalHour: + case IntervalDay: + case IntervalWeek: + case IntervalMonth: + case IntervalQuarter: + case IntervalYear: + case UInt32: + case UInt64: + case Int64: return ClickHouseType.LONG_TYPE; + case Int128: + case Int256: + case UInt128: + case UInt256: return ClickHouseType.BIGINTEGER_TYPE; + case Float32: return ClickHouseType.FLOAT_TYPE; + case Float64: return ClickHouseType.DOUBLE_TYPE; + case Decimal: + case Decimal32: + case Decimal64: + case Decimal128: + case Decimal256: return ClickHouseType.BIGDECIMAL_TYPE; + case FixedString: + case String: return ClickHouseType.STRING_TYPE; + case Date: + case Date32: return ClickHouseType.LOCALDATE_TYPE; + case DateTime: + case DateTime32: + case DateTime64: return ClickHouseType.LOCALDATETIME_TYPE; + case UUID: return ClickHouseType.UUID_TYPE; + case IPv4: return ClickHouseType.IPV4_TYPE; + case IPv6: return ClickHouseType.IPV6_TYPE; + case Bool: return ClickHouseType.BOOLEAN_TYPE; + case Map: return ClickHouseType.MAP_TYPE; + case Tuple: return ClickHouseType.TUPLE_TYPE; + default: throw new IllegalArgumentException("Unhandled type of Clickhouse data type"); + } + } +} diff --git a/clickhouse-r2dbc/src/main/resources/META-INF/services/io.r2dbc.spi.ConnectionFactoryProvider b/clickhouse-r2dbc/src/main/resources/META-INF/services/io.r2dbc.spi.ConnectionFactoryProvider new file mode 100644 index 000000000..448a87cc4 --- /dev/null +++ b/clickhouse-r2dbc/src/main/resources/META-INF/services/io.r2dbc.spi.ConnectionFactoryProvider @@ -0,0 +1 @@ +com.clickhouse.r2dbc.connection.ClickHouseConnectionFactoryProvider diff --git a/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spit/test/R2DBCTestKitImpl.java b/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spit/test/R2DBCTestKitImpl.java new file mode 100644 index 000000000..2abb49bba --- /dev/null +++ b/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spit/test/R2DBCTestKitImpl.java @@ -0,0 +1,316 @@ +package com.clickhouse.r2dbc.spit.test; + +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import com.clickhouse.jdbc.ClickHouseDriver; +import com.clickhouse.r2dbc.ClickHouseStatement; +import com.zaxxer.hikari.HikariDataSource; +import io.r2dbc.spi.Blob; +import io.r2dbc.spi.Clob; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactories; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Statement; +import io.r2dbc.spi.test.TestKit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcOperations; +import org.springframework.jdbc.core.JdbcTemplate; +import org.testcontainers.containers.ClickHouseContainer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.nio.ByteBuffer; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.Duration; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Optional; +import java.util.TimeZone; + +import static java.lang.String.format; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class R2DBCTestKitImpl implements TestKit { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseStatement.class); + + private static final ClickHouseContainer clickHouseContainer = new ClickHouseContainer("yandex/clickhouse-server"); + private static final String HOST = "localhost"; + private static final int GRPC_PORT = 9100; + private static final int HTTP_PORT = 8123; + private static final String DATABASE = "default"; + private static final String USER = "default"; + public static final String PASSWORD = ""; + ConnectionFactory connectionFactory = ConnectionFactories.get( + format("r2dbc:clickhouse:grpc://@%s:%d/%s", HOST, + clickHouseContainer.getMappedPort(GRPC_PORT), DATABASE)); + static JdbcTemplate jdbcTemplate; + + @BeforeAll + public static void setup() throws Exception { + String hostPath4Config = R2DBCTestKitImpl.class.getClassLoader().getResource("clickhouse-docker-mount").toURI().getPath(); + clickHouseContainer.withFileSystemBind(hostPath4Config,"/etc/clickhouse-server"); + clickHouseContainer.addExposedPort(9100); + clickHouseContainer.setPortBindings(Arrays.asList("0.0.0.0:" + GRPC_PORT + ":" + GRPC_PORT)); + clickHouseContainer.start(); + log.info("Exposed ports are : {}", clickHouseContainer.getExposedPorts()); + // jdbcTemplate(clickHouseContainer.getMappedPort(HTTP_PORT), null).execute(format("CREATE DATABASE %s [ENGINE = Atomic]", DATABASE)); + // log.info("Database with a name of {} is created", DATABASE); + jdbcTemplate = jdbcTemplate(clickHouseContainer.getMappedPort(HTTP_PORT), DATABASE); + } + + @Override + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + @Override + public String getPlaceholder(int i) { + return ":param" + i; + } + + @Override + public String getIdentifier(int i) { + return "param" + i; + } + + @Override + public JdbcOperations getJdbcOperations() { + return jdbcTemplate; + } + + private static JdbcTemplate jdbcTemplate(int port, String database) throws SQLException { + HikariDataSource source = new HikariDataSource(); + + Driver driver = new ClickHouseDriver(); + DriverManager.registerDriver(driver); + if (database == null) { + source.setJdbcUrl(format("jdbc:clickhouse://%s:%d", HOST, + port)); + } else { + source.setJdbcUrl(format("jdbc:clickhouse://%s:%d/%s", HOST, + port, DATABASE)); + } + + source.setUsername(USER); + source.setPassword(Optional.ofNullable(PASSWORD) + .map(Object::toString).orElse(null)); + source.setMaximumPoolSize(1); + source.setConnectionTimeout(Optional.ofNullable(Duration.ofSeconds(5)) + .map(Duration::toMillis).orElse(0L)); + + ZoneId zoneId = ZoneId.systemDefault(); + source.addDataSourceProperty("serverTimezone", TimeZone.getTimeZone(zoneId).getID()); + + return new JdbcTemplate(source); + } + + @Override + @Test + public void blobInsert() { + Flux.usingWhen(getConnectionFactory().create(), + connection -> { + + Statement statement = connection.createStatement(expand(TestStatement.INSERT_VALUE_PLACEHOLDER, getPlaceholder(0))); + assertThrows(IllegalArgumentException.class, () -> statement.bind(0, Blob.from(Mono.just(ByteBuffer.wrap("Unsupported type".getBytes())))), "bind(0, Blob) should fail"); + return Mono.empty(); + }, + Connection::close) + .as(StepVerifier::create) + .verifyComplete(); + } + + @Override + @Test + public void clobInsert() { + Flux.usingWhen(getConnectionFactory().create(), + connection -> { + + Statement statement = connection.createStatement(expand(TestStatement.INSERT_VALUE_PLACEHOLDER, getPlaceholder(0))); + assertThrows(IllegalArgumentException.class, () -> statement.bind(0, Clob.from(Mono.just("Unsupported type"))), "bind(0, Clob) should fail"); + return Mono.empty(); + }, + Connection::close) + .as(StepVerifier::create) + .verifyComplete(); + } + + @Override + @Disabled + public void blobSelect() { + // not supported + } + + @Override + @Disabled + public void clobSelect() { + // not supported + } + + @Override + @Test + public void columnMetadata() { + getJdbcOperations().execute(expand(TestStatement.INSERT_TWO_COLUMNS)); + + Flux.usingWhen(getConnectionFactory().create(), + connection -> Flux.from(connection + + .createStatement(expand(TestStatement.SELECT_VALUE_TWO_COLUMNS)) + .execute()), + Connection::close) + .as(StepVerifier::create) + .expectErrorMatches(ClickHouseException.class::isInstance) + .verify(); + } + + @Override + @Test + // TODO: check if it is doable. + public void compoundStatement() { + //compound statements are not supported by clickhouse. + getJdbcOperations().execute(expand(TestStatement.INSERT_VALUE100)); + + Flux.usingWhen(getConnectionFactory().create(), + connection -> Flux.from(connection + + .createStatement(expand(TestStatement.SELECT_VALUE_BATCH)) + .execute()), + Connection::close) + .as(StepVerifier::create) + .expectErrorMatches(ClickHouseException.class::isInstance) + .verify(); + } + + @Override + @Test + public void duplicateColumnNames() { + getJdbcOperations().execute(expand(TestStatement.INSERT_TWO_COLUMNS)); + + Flux.usingWhen(getConnectionFactory().create(), + connection -> Flux.from(connection + + .createStatement(expand(TestStatement.SELECT_VALUE_TWO_COLUMNS)) + .execute()), + Connection::close) + + .as(StepVerifier::create) + .expectErrorMatches(ClickHouseException.class::isInstance) + .verify(); + } + + @Override + @Test + public void returnGeneratedValues() { + getJdbcOperations().execute(expand(TestStatement.DROP_TABLE)); + getJdbcOperations().execute(getCreateTableWithAutogeneratedKey()); + Flux.usingWhen(getConnectionFactory().create(), + connection -> { + Statement statement = connection.createStatement(getInsertIntoWithAutogeneratedKey()); + + statement.returnGeneratedValues(); + + return Flux.from(statement + .execute()) + .flatMap(it -> it.map((row, rowMetadata) -> row.get(0))); + }, + Connection::close) + .as(StepVerifier::create) + .expectErrorMatches(UnsupportedOperationException.class::isInstance) + .verify(); + } + + @Override + @Test + public void returnGeneratedValuesFails() { + + Flux.usingWhen(getConnectionFactory().create(), + connection -> { + Statement statement = connection.createStatement(expand(TestStatement.INSERT_VALUE100)); + + assertThrows(UnsupportedOperationException.class, () -> statement.returnGeneratedValues((String[]) null)); + return Mono.empty(); + }, + Connection::close) + .as(StepVerifier::create) + .verifyComplete(); + } + + @Override + @Test + @Disabled + public void transactionRollback() { + // since there is not transaction support, this test case is disabled. + } + + @Override + @Test + @Disabled + public void sameAutoCommitLeavesTransactionUnchanged() { + // since there is not transaction support, this test case is disabled. + } + + @Override + @Test + @Disabled + public void savePoint() { + + } + + @Override + @Test + @Disabled + public void savePointStartsTransaction() { + + } + + @Override + public String expand(TestStatement statement, Object... args) { + try { + String sql = ClickHouseTestStatement.get(statement).getSql(); + return String.format(sql, args); + } catch (IllegalArgumentException e) { + return String.format(statement.getSql(), args); + } + } + + + private enum ClickHouseTestStatement { + CREATE_TABLE(TestStatement.CREATE_TABLE, "CREATE TABLE test ( test_value INTEGER ) ENGINE = Memory"), + CREATE_TABLE_TWO_COLUMNS(TestStatement.CREATE_TABLE_TWO_COLUMNS, "CREATE TABLE test_two_column ( col1 INTEGER, col2 VARCHAR(100) ) ENGINE = Memory"), + CREATE_BLOB_TABLE(TestStatement.CREATE_BLOB_TABLE, "CREATE TABLE blob_test ( test_value %s ) ENGINE = Memory"), + CREATE_CLOB_TABLE(TestStatement.CREATE_CLOB_TABLE, "CREATE TABLE clob_test ( test_value %s ) ENGINE = Memory"), + CREATE_TABLE_AUTOGENERATED_KEY(TestStatement.CREATE_TABLE_AUTOGENERATED_KEY, "CREATE TABLE test ( id DATE DEFAULT toDate(now()) , test_value INTEGER ) ENGINE = Memory"), + INSERT_VALUE_AUTOGENERATED_KEY(TestStatement.INSERT_VALUE_AUTOGENERATED_KEY, "INSERT INTO test(test_value) VALUES(100)"); + + + ClickHouseTestStatement(TestStatement testStatement, String sql) { + this.testStatementToBeOverwridden = testStatement; + this.sql = sql; + } + + TestStatement testStatementToBeOverwridden; + String sql; + + static ClickHouseTestStatement get(TestStatement testStatement) { + for (ClickHouseTestStatement cts : values()) { + if (cts.getTestStatementToBeOverwridden() == testStatement) + return cts; + } + throw new IllegalArgumentException("Teststatement is not found."); + } + + public String getSql() { + return sql; + } + + public TestStatement getTestStatementToBeOverwridden() { + return testStatementToBeOverwridden; + } + } +} diff --git a/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.d/docker_related_config.xml b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.d/docker_related_config.xml new file mode 100644 index 000000000..3025dc269 --- /dev/null +++ b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.d/docker_related_config.xml @@ -0,0 +1,12 @@ + + + :: + 0.0.0.0 + 1 + + + diff --git a/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.xml b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.xml new file mode 100644 index 000000000..7e103bd6b --- /dev/null +++ b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/config.xml @@ -0,0 +1,1294 @@ + + + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + + 1000M + 10 + + + + + + + + + + + + + + + + + + 8123 + + + 9000 + + + 9004 + + + 9005 + + + + + + + + + + + + 9009 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 4096 + + + 3 + + + 9100 + + false + + + /path/to/ssl_cert_file + /path/to/ssl_key_file + + + false + + + /path/to/ssl_ca_cert_file + + + deflate + + + medium + + + -1 + -1 + + + false + + + + + + + /etc/clickhouse-server/server.crt + /etc/clickhouse-server/server.key + + /etc/clickhouse-server/dhparam.pem + none + true + true + sslv2,sslv3 + true + + + + true + true + sslv2,sslv3 + true + + + + RejectCertificateHandler + + + + + + + + + 100 + + + 0 + + + + 10000 + + + + + + 0.9 + + + 4194304 + + + 0 + + + + + + 8589934592 + + + 5368709120 + + + + 1000 + + + 134217728 + + + 10000 + + + /var/lib/clickhouse/ + + + /var/lib/clickhouse/tmp/ + + + + + + /var/lib/clickhouse/user_files/ + + + + + + + + + + + + + users.xml + + + + /var/lib/clickhouse/access/ + + + + + + + default + + + + + + + + + + + + default + + + + + + + + + true + + + false + + ' | sed -e 's|.*>\(.*\)<.*|\1|') + wget https://github.com/ClickHouse/clickhouse-jdbc-bridge/releases/download/v$PKG_VER/clickhouse-jdbc-bridge_$PKG_VER-1_all.deb + apt install --no-install-recommends -f ./clickhouse-jdbc-bridge_$PKG_VER-1_all.deb + clickhouse-jdbc-bridge & + + * [CentOS/RHEL] + export MVN_URL=https://repo1.maven.org/maven2/ru/yandex/clickhouse/clickhouse-jdbc-bridge + export PKG_VER=$(curl -sL $MVN_URL/maven-metadata.xml | grep '' | sed -e 's|.*>\(.*\)<.*|\1|') + wget https://github.com/ClickHouse/clickhouse-jdbc-bridge/releases/download/v$PKG_VER/clickhouse-jdbc-bridge-$PKG_VER-1.noarch.rpm + yum localinstall -y clickhouse-jdbc-bridge-$PKG_VER-1.noarch.rpm + clickhouse-jdbc-bridge & + + Please refer to https://github.com/ClickHouse/clickhouse-jdbc-bridge#usage for more information. + ]]> + + + + + + + + + + + + + + + + localhost + 9000 + + + + + + + + false + + 127.0.0.1 + 9000 + + + 127.0.0.2 + 9000 + + + 127.0.0.3 + 9000 + + + + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + + + 127.0.0.1 + 9000 + + + + + 127.0.0.2 + 9000 + + + + + + true + + 127.0.0.1 + 9000 + + + + true + + 127.0.0.2 + 9000 + + + + + + + localhost + 9440 + 1 + + + + + + + localhost + 9000 + + + + + localhost + 1 + + + + + + + + + + + + + + + + + + + + + + + + 3600 + + + + 3600 + + + 60 + + + + + + + + + + + + + system + query_log
+ + toYYYYMM(event_date) + + + + + + 7500 +
+ + + + system + trace_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + system + query_thread_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + part_log
+ toYYYYMM(event_date) + 7500 +
+ + + + + + system + metric_log
+ 7500 + 1000 +
+ + + + system + asynchronous_metric_log
+ + 7000 +
+ + + + + + engine MergeTree + partition by toYYYYMM(finish_date) + order by (finish_date, finish_time_us, trace_id) + + system + opentelemetry_span_log
+ 7500 +
+ + + + + system + crash_log
+ + + 1000 +
+ + + + system + session_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + + + + + + + + + + + + + + + *_dictionary.xml + + + *_function.xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + /clickhouse/task_queue/ddl + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + click_cost + any + + 0 + 3600 + + + 86400 + 60 + + + + max + + 0 + 60 + + + 3600 + 300 + + + 86400 + 3600 + + + + + + /var/lib/clickhouse/format_schemas/ + + + + + hide encrypt/decrypt arguments + ((?:aes_)?(?:encrypt|decrypt)(?:_mysql)?)\s*\(\s*(?:'(?:\\'|.)+'|.*?)\s*\) + + \1(???) + + + + + + + + + + false + + false + + + https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277 + + + + + + + +
diff --git a/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/docker_related_config.xml b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/docker_related_config.xml new file mode 100644 index 000000000..3025dc269 --- /dev/null +++ b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/docker_related_config.xml @@ -0,0 +1,12 @@ + + + :: + 0.0.0.0 + 1 + + + diff --git a/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/users.xml b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/users.xml new file mode 100644 index 000000000..fd5fe4145 --- /dev/null +++ b/clickhouse-r2dbc/src/test/resources/clickhouse-docker-mount/users.xml @@ -0,0 +1,123 @@ + + + + + + + + + + 10000000000 + + + random + + + + + 1 + + + + + + + + + + + + + ::/0 + + + + default + + + default + + + + + + + + + + + + + + 3600 + + + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/pom.xml b/pom.xml index 2da2d1987..3ce37783c 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,7 @@ clickhouse-tcp-client clickhouse-jdbc clickhouse-benchmark + clickhouse-r2dbc