Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r2dbc module is added. #914

Merged
merged 2 commits into from
Sep 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions clickhouse-r2dbc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# clickhouse-r2dbc

This module provides r2dbc support to clickhouse-jdbc driver.

r2dbc link : https://r2dbc.io/

Sample code:
```java
ConnectionFactory connectionFactory = ConnectionFactories
.get("r2dbc:clickhouse:http://{username}:{password}@{host}:{port}/{database}");

Mono.from(connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("select domain, path, toDate(cdate) as d, count(1) as count from clickdb.clicks where domain = :domain group by domain, path, d")
.bind("domain", domain)
.execute())
.flatMap(result -> result
.map((row, rowMetadata) -> String.format("%s%s[%s]:%d", row.get("domain", String.class),
row.get("path", String.class),
row.get("d", LocalDate.class),
row.get("count", Long.class)) ))
.doOnNext(System.out::println)
.subscribe();
```

for full example please check clickhouse-jdbc/examples/clickhouse-r2dbc-samples/clickhouse-r2dbc-spring-webflux-sample .
307 changes: 307 additions & 0 deletions clickhouse-r2dbc/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseRequest;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

public class ClickHouseBatch implements Batch {

private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.TabSeparatedWithNamesAndTypes;
private ClickHouseRequest<?> request;
final List<String> sqlList;

public ClickHouseBatch(ClickHouseRequest<?> request) {
this.request = request;
this.sqlList = new ArrayList<>();
}

@Override
public Batch add(String sql) {
sqlList.add(sql);
return this;
}

@Override
public Publisher<? extends Result> execute() {
return Flux.fromStream(sqlList.stream().map(sql -> {
request.query(sql).format(PREFERRED_FORMAT);
return Mono.fromFuture(request::execute); }))
.flatMap(Mono::flux)
.map(ClickHouseResult::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.r2dbc.types.ClickHouseDataTypeWrapper;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Type;

public class ClickHouseColumnMetadata implements ColumnMetadata {

final Type type;
final String name;

ClickHouseColumnMetadata(ClickHouseColumn col) {
this.name = col.getColumnName(); // TODO :check alias handling.
this.type = ClickHouseDataTypeWrapper.of(col.getDataType());
}

@Override
public Type getType() {
return type;
}

@Override
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.ClickHouseResponseSummary;
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import org.apache.commons.lang3.tuple.Pair;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;

public class ClickHouseResult implements Result {

private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class);

private final Flux<? extends Result.Segment> rowSegments;
private final Mono<? extends Result.Segment> updatedCount;
private final Flux<? extends Result.Segment> segments;

ClickHouseResult(ClickHouseResponse response) {
this.rowSegments = Mono.just(response)
.flatMapMany(resp -> Flux
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
.map(rec -> Pair.of(resp.getColumns(), rec))))
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
.map(RowSegment::new);
this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
.map(ClickHouseResponseSummary::getProgress)
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
.map(UpdateCount::new);
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
}

ClickHouseResult(Flux<? extends Result.Segment> rowSegments, Mono<? extends Result.Segment> updatedCount) {
this.rowSegments = rowSegments;
this.updatedCount = updatedCount;
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
}

/**
* Returns updated count(written rows from summary of {@link ClickHouseResponse}).Important! if writtenRows is greater than MAX_INT then it will return MAX_INT.
*
* @return updated count
*/
@Override
public Mono<Long> getRowsUpdated() {
return updatedCount.map(val -> ((UpdateCount) val).value());
}

@Override
public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
return rowSegments.cast(RowSegment.class)
.map(RowSegment::row).handle((row, sink) -> {
try {
sink.next(biFunction.apply(row, row.getMetadata()));
} catch (Exception e) {
log.error("Provided function caused exception:", e);
}
});
}

@Override
public Result filter(Predicate<Segment> predicate) {
return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate));
}

@Override
public <T> Publisher<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> function) {
return segments.flatMap(segment -> {
try {
Publisher<? extends T> retValue = function.apply(segment);
if (retValue == null) {
return Mono.error(new IllegalStateException("flatmap function returned null value"));
}
return retValue;
} catch (Exception e) {
log.error("Provided function caused exception:", e);
return Mono.error(e);
}
});
}


class RowSegment implements Result.RowSegment {

final ClickHouseRow row;

RowSegment(ClickHouseRow row) {
this.row = row;
}

@Override
public Row row() {
return row;
}
}

class UpdateCount implements Result.UpdateCount {

final long updateCount;

UpdateCount(long updateCount) {
this.updateCount = updateCount;
}

@Override
public long value() {
return updateCount;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.clickhouse.r2dbc;

import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseRecord;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ClickHouseRow implements Row {

final ClickHouseRecord record;
final ClickHouseRowMetadata rowMetadata;

ClickHouseRow(ClickHouseRecord record, List<ClickHouseColumn> columnList) {
this.record = record;
this.rowMetadata = new ClickHouseRowMetadata(columnList.stream()
.map(ClickHouseColumnMetadata::new)
.collect(Collectors
.toMap(ClickHouseColumnMetadata::getName,
Function.identity(),
(v1,v2) -> v2, // since every key will be unique, won't need to merge so just overwrite with the latest one.
LinkedHashMap::new)));
}

@Override
public RowMetadata getMetadata() {
return rowMetadata;
}

@Override
public <T> T get(int i, Class<T> aClass) {
return aClass.cast(record.getValue(i).asObject(aClass));
}

@Override
public <T> T get(String name, Class<T> aClass) {
try {
return aClass.cast(record.getValue(name).asObject(aClass));
} catch (IllegalArgumentException e) {
throw new NoSuchElementException(String.format("Unknown element with a name %s", name));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.clickhouse.r2dbc;

import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.RowMetadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;

public class ClickHouseRowMetadata implements RowMetadata {

LinkedHashMap<String, ClickHouseColumnMetadata> columnNameMetadataMap;

ClickHouseRowMetadata( LinkedHashMap<String, ClickHouseColumnMetadata> columnNameMetadataMap) {
this.columnNameMetadataMap = columnNameMetadataMap;
}

@Override
public ColumnMetadata getColumnMetadata(int i) {
if (i > columnNameMetadataMap.size())
throw new IllegalArgumentException("Given index is greater than size column metadata array.");
return columnNameMetadataMap.entrySet().stream().skip(i-1).findFirst().get().getValue();
}

@Override
public ColumnMetadata getColumnMetadata(String columnName) {
return columnNameMetadataMap.get(columnName);
}

@Override
public List<? extends ColumnMetadata> getColumnMetadatas() {
return Collections.unmodifiableList(new ArrayList<>(columnNameMetadataMap.values()));
}
}
Loading