Skip to content

Commit

Permalink
first version of r2dbc
Browse files Browse the repository at this point in the history
  • Loading branch information
rernas35 committed May 4, 2022
1 parent e421799 commit d9a6f24
Show file tree
Hide file tree
Showing 22 changed files with 2,904 additions and 0 deletions.
132 changes: 132 additions & 0 deletions clickhouse-r2dbc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>clickhouse-java</artifactId>
<groupId>com.clickhouse</groupId>
<version>0.3.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>clickhouse-r2dbc</artifactId>

<properties>
<hikari-cp.version>4.0.3</hikari-cp.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<r2dbc-spi.version>0.9.1.RELEASE</r2dbc-spi.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<testcontainers.elasticsearch.version>1.16.3</testcontainers.elasticsearch.version>
<testcontainers.clickhouse.version>1.16.3</testcontainers.clickhouse.version>
<docker-java-api.version>3.2.13</docker-java-api.version>
<docker-java-transport-zerodep.version>3.2.13</docker-java-transport-zerodep.version>
<surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.16</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>${r2dbc-spi.version}</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi-test</artifactId>
<version>${r2dbc-spi.version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-grpc-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>${testcontainers.elasticsearch.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<version>${testcontainers.clickhouse.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
<version>${docker-java-api.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-transport-zerodep</artifactId>
<version>${docker-java-transport-zerodep.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikari-cp.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<includes>
<include>%regex[.*Test*.*]</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseRequest;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

public class ClickHouseBatch implements Batch {

private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes;
private ClickHouseRequest<?> request;
List<String> sqlList = new ArrayList<>();

public ClickHouseBatch(ClickHouseRequest request) {
this.request = request;
}

@Override
public Batch add(String sql) {
sqlList.add(sql);
return this;
}

@Override
public Publisher<? extends Result> execute() {
return Flux.fromStream(sqlList.stream().map(sql -> {
request.query(sql).format(PREFERRED_FORMAT);
return Mono.fromFuture(request.execute());
}).map(ClickHouseResult::new));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.r2dbc.types.TypeMapper;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Type;

public class ClickHouseColumnMetadata implements ColumnMetadata {

final Type type;
final String name;

ClickHouseColumnMetadata(ClickHouseColumn col) {
this.name = col.getColumnName(); // TODO :check alias handling.
this.type = TypeMapper.getType(col.getDataType());
}

@Override
public Type getType() {
return type;
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.ClickHouseResponseSummary;
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import org.apache.commons.lang3.tuple.Pair;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;

public class ClickHouseResult implements Result {

private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class);

private final Flux<? extends Result.Segment> rowSegments;
private final Mono<? extends Result.Segment> updatedCount;
private final Flux<? extends Result.Segment> segments;

ClickHouseResult(Mono<ClickHouseResponse> response) {
this.rowSegments = response.flux()
.flatMap(resp -> Flux
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
.map(rec -> Pair.of(resp.getColumns(), rec))))
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
.map(RowSegment::new);
this.updatedCount = response.map(clickHouseResponse -> clickHouseResponse.getSummary())
.map(ClickHouseResponseSummary::getProgress)
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
.map(UpdateCount::new);
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
}

ClickHouseResult(Flux<? extends Result.Segment> rowSegments, Mono<? extends Result.Segment> updatedCount) {
this.rowSegments = rowSegments;
this.updatedCount = updatedCount;
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
}

/**
* Returns updated count(written rows from summary of {@link ClickHouseResponse}).Important! if writtenRows is greater than MAX_INT then it will return MAX_INT.
* @return
*/
@Override
public Mono<Integer> getRowsUpdated() {

return updatedCount.map(val -> {
UpdateCount updateCount = (UpdateCount) val;
if (updateCount.value() > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else return Long.valueOf(updateCount.value()).intValue();
});
}

@Override
public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
return rowSegments.cast(RowSegment.class)
.map(RowSegment::row).handle((row, sink) -> {
try {
sink.next(biFunction.apply(row, row.getMetadata()));
} catch (Exception e) {
log.error("Provided function caused exception:", e);
}
});
}

@Override
public Result filter(Predicate<Segment> predicate) {
return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate));
}

@Override
public <T> Publisher<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> function) {
return segments.flatMap(segment -> {
try {
Publisher<? extends T> retValue = function.apply(segment);
if (retValue == null) {
return Mono.error(new IllegalStateException("flatmap function returned null value"));
}
return retValue;
} catch (Exception e) {
log.error("Provided function caused exception:", e);
return Mono.error(e);
}
});
}


class RowSegment implements Result.RowSegment {

final ClickHouseRow row;

RowSegment(ClickHouseRow row) {
this.row = row;
}

@Override
public Row row() {
return row;
}
}

class UpdateCount implements Result.UpdateCount {

final long updateCount;

UpdateCount(long updateCount) {
this.updateCount = updateCount;
}

@Override
public long value() {
return updateCount;
}
}
}
Loading

0 comments on commit d9a6f24

Please sign in to comment.