Skip to content

Commit

Permalink
Merge branch 'develop' into 'master'
Browse files Browse the repository at this point in the history
Release 0.35.0-SNAPSHOT

See merge request hercules/hercules!218
  • Loading branch information
gnkoshelev committed Feb 11, 2020
2 parents 7d33787 + 1c0610a commit b90754f
Show file tree
Hide file tree
Showing 95 changed files with 1,798 additions and 366 deletions.
2 changes: 1 addition & 1 deletion hercules-application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hercules</artifactId>
<groupId>ru.kontur.vostok.hercules</groupId>
<version>0.34.0-SNAPSHOT</version>
<version>0.35.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion hercules-auth/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>hercules</artifactId>
<groupId>ru.kontur.vostok.hercules</groupId>
<version>0.34.0-SNAPSHOT</version>
<version>0.35.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion hercules-cassandra-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hercules</artifactId>
<groupId>ru.kontur.vostok.hercules</groupId>
<version>0.34.0-SNAPSHOT</version>
<version>0.35.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion hercules-cassandra-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>hercules</artifactId>
<groupId>ru.kontur.vostok.hercules</groupId>
<version>0.34.0-SNAPSHOT</version>
<version>0.35.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
38 changes: 38 additions & 0 deletions hercules-clickhouse-sink/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hercules</artifactId>
<groupId>ru.kontur.vostok.hercules</groupId>
<version>0.35.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hercules-clickhouse-sink</artifactId>

<dependencies>
<dependency>
<groupId>ru.kontur.vostok.hercules</groupId>
<artifactId>hercules-clickhouse-util</artifactId>
</dependency>
<dependency>
<groupId>ru.kontur.vostok.hercules</groupId>
<artifactId>hercules-sink</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package ru.kontur.vostok.hercules.clickhouse.sink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.kontur.vostok.hercules.clickhouse.util.ClickHouseConnector;
import ru.kontur.vostok.hercules.health.AutoMetricStopwatch;
import ru.kontur.vostok.hercules.health.MetricsCollector;
import ru.kontur.vostok.hercules.health.Timer;
import ru.kontur.vostok.hercules.kafka.util.processing.BackendServiceFailedException;
import ru.kontur.vostok.hercules.protocol.Event;
import ru.kontur.vostok.hercules.sink.ProcessorStatus;
import ru.kontur.vostok.hercules.sink.Sender;
import ru.kontur.vostok.hercules.util.properties.PropertiesUtil;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.util.ClickHouseRowBinaryStream;

import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Base ClickHouse sender.
* <p>
* Events are converted to satisfy SQL-query {@link #query()} in according to table schema in DB.
* Sender inserts events by batches to ClickHouse using binary format.
* Thus, accurate implementation of {@link #write(ClickHouseRowBinaryStream, Event)} method is required.
*
* @author Gregory Koshelev
*/
public abstract class ClickHouseSender extends Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSender.class);

private final ClickHouseConnector connector;

private final Timer processingTimeMsTimer;

public ClickHouseSender(Properties properties, MetricsCollector metricsCollector) {
super(properties, metricsCollector);

Properties clickhouseProperties = PropertiesUtil.ofScope(properties, "clickhouse");
this.connector = new ClickHouseConnector(clickhouseProperties);

this.processingTimeMsTimer = metricsCollector.timer("processingTimeMs");
}

@Override
protected int send(List<Event> events) throws BackendServiceFailedException {
ClickHouseConnection conn = getConnection();

final AtomicInteger count = new AtomicInteger(0);
try (ClickHouseStatement statement = conn.createStatement();
AutoMetricStopwatch ignored = new AutoMetricStopwatch(processingTimeMsTimer, TimeUnit.MILLISECONDS)) {
statement.write().send(
query(),
stream -> {
for (Event event : events) {
if (write(stream, event)) {
count.incrementAndGet();
}
}
},
ClickHouseFormat.RowBinary);
} catch (SQLException ex) {
throw new BackendServiceFailedException(ex);
} catch (RuntimeException ex) {
LOGGER.error("Unexpected error has been acquired", ex);
throw new BackendServiceFailedException(ex);
}

return count.get();
}

@Override
protected ProcessorStatus ping() {
return connector.isConnected() ? ProcessorStatus.AVAILABLE : ProcessorStatus.UNAVAILABLE;
}

@Override
public boolean stop(long timeout, TimeUnit unit) {
boolean stopped;
try {
stopped = super.stop(timeout, unit);
} finally {
connector.close();
}
return stopped;
}

private ClickHouseConnection getConnection() throws BackendServiceFailedException {
return (ClickHouseConnection) connector.connection().orElseThrow(BackendServiceFailedException::new);
}

/**
* INSERT query.
* <p>
* Query must be like {@code INSERT INTO [db.]table [(column1, column2, column3)]}.
*
* @return INSERT query
*/
protected abstract String query();

/**
* Write the event to row binary stream.
*
* @param event the event
* @return {@code true} if the event has been written, otherwise {@code false}
*/
protected abstract boolean write(ClickHouseRowBinaryStream stream, Event event) throws IOException;
}
35 changes: 35 additions & 0 deletions hercules-clickhouse-util/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hercules</artifactId>
<groupId>ru.kontur.vostok.hercules</groupId>
<version>0.35.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hercules-clickhouse-util</artifactId>


<dependencies>
<dependency>
<groupId>ru.kontur.vostok.hercules</groupId>
<artifactId>hercules-util</artifactId>
</dependency>

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit b90754f

Please sign in to comment.