Skip to content

Commit

Permalink
Feature/summingmergetree (#57)
Browse files Browse the repository at this point in the history
* Added summingmergetree engine

* Test and fixes for summing merge tree
  • Loading branch information
erdebee authored and ecyshor committed Aug 29, 2018
1 parent 54e4091 commit 4362ba3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,42 @@ object Engine {
indexGranularity: Int): ReplacingMergeTree =
apply(monthPartitionCompat(dateColumn), primaryKey, samplingExpression, indexGranularity)
}
//SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, ...), 8192)

case class AggregatingMergeTree(partition: Seq[String],

case class SummingMergeTree(partition: Seq[String],
primaryKey: Seq[Column],
summingColumns: Seq[Column] = Seq.empty,
samplingExpression: Option[String] = None,
indexGranularity: Int = MergeTreeEngine.DefaultIndexGranularity)
extends MergeTreeEngine("SummingMergeTree") {

override def toString: String = {
val summingColArg =
if (summingColumns.isEmpty) ""
else "((" + summingColumns.map(_.name).mkString(", ") + "))"

s"""$name$summingColArg
|${statements.mkString("\n")}""".stripMargin
}
}

object SummingMergeTree {
def apply(dateColumn: NativeColumn[LocalDate], primaryKey: Seq[Column]): SummingMergeTree =
apply(monthPartitionCompat(dateColumn), primaryKey)

def apply(dateColumn: NativeColumn[LocalDate], primaryKey: Seq[Column], summingColumns: Seq[Column]): SummingMergeTree =
apply(monthPartitionCompat(dateColumn), primaryKey, summingColumns)

def apply(dateColumn: NativeColumn[LocalDate],
primaryKey: Seq[Column],
summingColumns: Seq[Column],
samplingExpression: Option[String],
indexGranularity: Int): SummingMergeTree =
apply(monthPartitionCompat(dateColumn), primaryKey, summingColumns, samplingExpression, indexGranularity)
}

case class AggregatingMergeTree(partition: Seq[String],
primaryKey: Seq[Column],
samplingExpression: Option[String] = None,
indexGranularity: Int = MergeTreeEngine.DefaultIndexGranularity)
Expand Down Expand Up @@ -140,7 +174,11 @@ object Engine {

case class Replicated(zookeeperPath: String, replicaName: String, engine: MergeTreeEngine) extends Engine {
override def toString: String = {
val replicationArgs = Seq(zookeeperPath, replicaName).map(StringQueryValue(_)).mkString(", ")
val summingColArg = Seq(engine).collect { case s:SummingMergeTree => "(" + s.summingColumns.map(_.name).mkString(", ") + ")"}

val replicationArgs = (
Seq(zookeeperPath, replicaName).map(StringQueryValue(_)
) ++ summingColArg).mkString(", ")

s"""Replicated${engine.name}($replicationArgs)
|${engine.statements.mkString("\n")}""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.crobox.clickhouse.dsl.AggregateFunction.StateResult
import com.crobox.clickhouse.dsl.{NativeColumn, RefColumn, TableColumn, UInt32}
import com.crobox.clickhouse.dsl.TestSchema.TestTable
import com.crobox.clickhouse.dsl.schemabuilder.DefaultValue.Default
import com.crobox.clickhouse.dsl.schemabuilder.Engine.SummingMergeTree
import org.joda.time.LocalDate
import org.scalatest.{FlatSpecLike, Matchers}

Expand Down Expand Up @@ -249,4 +250,26 @@ class CreateTableTest extends FlatSpecLike with Matchers {
|SETTINGS index_granularity=8192""".stripMargin)
}

it should "create a table with an SummingMergeTree engine" in {
val date = NativeColumn[LocalDate]("date", ColumnType.Date)
val client_count = NativeColumn("client_count", ColumnType.UInt8)
val summingColumns = Seq(client_count)

val create = CreateTable(
TestTable(
"test_table_agg",
Seq(date, client_count)
),
SummingMergeTree(date, Seq(date),summingColumns)
)

create.toString should be (
"""CREATE TABLE default.test_table_agg (
| date Date,
| client_count UInt8
|) ENGINE = SummingMergeTree((client_count))
|PARTITION BY (toYYYYMM(date))
|ORDER BY (date)
|SETTINGS index_granularity=8192""".stripMargin)
}
}

0 comments on commit 4362ba3

Please sign in to comment.