Skip to content

Commit

Permalink
example is working
Browse files Browse the repository at this point in the history
  • Loading branch information
rernas35 committed Jul 24, 2022
1 parent 3ef8f0d commit 14f931f
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -77,6 +79,9 @@ private Object safeValue(Object o) {
throw new IllegalArgumentException(UNSUPPORTED_DATATYPE_BLOB);
} else if (o instanceof Clob) {
throw new IllegalArgumentException(UNSUPPORTED_DATATYPE_CLOB);
} else if (o instanceof LocalDateTime) {
LocalDateTime dateTime = (LocalDateTime) o;
return (Timestamp.valueOf(dateTime).getTime() / 1000);
} else if (o instanceof Parameter) {
Object value = ((Parameter) o).getValue();
if (value == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>clickhouse-r2dbc</artifactId>
<version>0.3.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>


Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@
@Configuration
public class R2DBCConfig {

@Value("${clickhouse.hosdot:localhost}")
@Value("${clickhouse.host:localhost}")
private String host;

@Value("${clickhouse.port:9100}")
@Value("${clickhouse.port:8123}")
private String port;

@Value("${clickhouse.database:clickdb}")
private String database;

@Value("${clickhouse.user:default}")
private String user;

@Value("${clickhouse.password:''}")
private String password;

@Bean
public ConnectionFactory connectionFactory() {
return get(format("r2dbc:clickhouse:grpc://@%s:%d/%s", host, Integer.parseInt(port), database));
return get(format("r2dbc:clickhouse:http://%s:%s@%s:%d/%s", user, password,
host, Integer.parseInt(port), database));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

@RestController
@RequestMapping("/clicks")
Expand All @@ -26,14 +31,16 @@ public class ClickController {
ClickRepository clickRepository;

@GetMapping("/{domain}")
public Publisher<ResponseEntity<List<ClickStats>>> getEmployeeById(@PathVariable("domain") String domain) {
return clickRepository.getStatsByDomain(domain).collect(Collectors.toList()).map(list -> ResponseEntity.ok(list));
public Publisher<List<ClickStats>> getEmployeeById(@PathVariable("domain") String domain) {
return Flux.from(clickRepository.getStatsByDomain(domain).collect(Collectors.toList()));
}

@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Publisher<ResponseEntity> add(Click click){
return clickRepository.add(new ClickStats(click.getDomain(), click.getPath(), LocalDateTime.now(), 1)).map(v -> ResponseEntity.ok().build());
public Mono<Void> add(@RequestBody Click click){
return Mono.from(clickRepository.add(click));


}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package com.clickhouse.r2dbc.spring.webflux.sample.model;


import java.time.LocalDateTime;
import com.fasterxml.jackson.annotation.JsonFormat;

import java.time.LocalDate;

public class ClickStats {
private String domain;
private String path;
private LocalDateTime cdate;
@JsonFormat(pattern = "yyyy-MM-dd")
private LocalDate cdate;
private long count;

public ClickStats(String domain, String path, LocalDateTime date, long count) {
public ClickStats(String domain, String path, LocalDate date, long count) {
this.domain = domain;
this.path = path;
this.cdate = date;
Expand Down Expand Up @@ -40,11 +43,11 @@ public void setCount(long count) {
this.count = count;
}

public LocalDateTime getCdate() {
public LocalDate getCdate() {
return cdate;
}

public void setCdate(LocalDateTime cdate) {
public void setCdate(LocalDate cdate) {
this.cdate = cdate;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package com.clickhouse.r2dbc.spring.webflux.sample.repository;

import com.clickhouse.r2dbc.spring.webflux.sample.model.Click;
import com.clickhouse.r2dbc.spring.webflux.sample.model.ClickStats;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.LocalDate;
import java.time.LocalDateTime;


Expand All @@ -19,19 +24,24 @@ public class ClickRepository {

public Flux<ClickStats> getStatsByDomain(String domain){
return Mono.from(connectionFactory.create())
.flatMapMany(conn -> conn.createStatement("select domain, path, count(1) from clickdb.clicks where domain = :domain group by domain, path")
.flatMapMany(conn -> conn.createStatement("select domain, path, toDate(cdate) as d, count(1) as count from clickdb.clicks where domain = :domain group by domain, path, d")
.bind("domain", domain)
.execute())
.flatMap(result -> result.map((row, rowMetadata) -> new ClickStats(row
.get("domain", String.class), row.get("path", String.class), row.get("cdate", LocalDateTime.class), row.get("count", Long.class))));
.get("domain", String.class), row.get("path", String.class), row.get("d", LocalDate.class), row.get("count", Long.class))));
}

public Mono<Void> add(ClickStats click){
return Mono.from(connectionFactory.create()).map(conn -> conn.createStatement("insert into clickdb.clicks values (:domain, :path, :cdate)")
.bind("domain", click.getDomain())
.bind("path", click.getPath())
.bind("cdate", click.getCdate()).execute())
.then();
public Mono<Void> add(Click click){
return Mono.from(connectionFactory.create())
.flatMapMany(conn -> execute(click, conn)).then();
}

private Publisher<? extends Result> execute(Click click, Connection conn) {
return conn.createStatement("insert into clickdb.clicks values (:domain, :path, :cdate, :count)")
.bind("domain", click.getDomain())
.bind("path", click.getPath())
.bind("cdate", LocalDateTime.now())
.bind("count", 1).execute();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
clickhouse:
host: localhost
port: 9100
port: 8123
database: clickdb
user: default
password: ""
debug: true

0 comments on commit 14f931f

Please sign in to comment.