ClickHouse = Clickstream data wareHouse, is a column-oriented Database Management System (DBMS) used for Online Analytical Processing (OLAP) focusing on providing fast response to complex analysis on massive amounts of data ("can easily be 1,000x faster than row-oriented DBMS when used in OLAP scenarios").
Base ClickHouse data types: Note that ANSI SQL data types all have appropriate aliases
- ClickHouse can also read and write in dozens of formats (https://clickhouse.com/docs/en/interfaces/formats)
Running on own machine: (using Docker)
docker run -d --name some-clickhouse-server --ulimit nofile=262144:262144 clickhouse
starts the ClickHouse serverdocker exec -it some-clickhouse-server clickhouse-client
enters the user to the built-in ClickHouse client CLI
ClickHouse has a lot of different ways to retrieve and store data to optimize storage and access. Each table has an Engine (https://clickhouse.com/docs/en/engines/table-engines) which determines how and where its data is stored (local file system, in-memory, external systems, materialized views, ...), defined using ENGINE = <engine_name>
in the CREATE TABLE
query.
- It also defines which queries are supported, concurrent data access, multithreading request capabilities, replication, and more.
- There are many engines to integrate ClickHouse with external systems.
Almost always the MergeTree table engine (or one of the other engines in the MergeTree family) will be used as they are the most universal and functional table engines for high-load tasks.
CREATE TABLE my_db.my_table
(
column1 FixedString(1),
column2 UInt32,
column3 String
)
ENGINE = MergeTree
ORDER BY (column1, column2)
Primary key determines how the data is stored and searched, if no PRIMARY KEY
is specifically defined, then it uses the ORDER BY
clause. Primary Keys in ClickHouse are not (necessarily) unique to each row (pick columns tha are most frequently searched on). The best strategy is "lower cardinality values early in the primary key" (will allows ClickHouse to skip more granules that are straight away not desired)
INSERT
s are performed in bulk, creating a part (stored in its own folder). Each column in a part has an immutable file with just the column's data:
Granule is the smallest indivisible data set that ClickHouse reads when searching rows. Once ClickHouse knows the granule(s) (called a stripe of granules) that need to be serached, each stripe of granules is sent to a thread for processing (stripes are processed concurrently, with faster threads stealing tasks from slower threads).
ClickHouse then merges these created folders (partly why its called a MergeTree) over time; deleting the unused, redundant parts in favour of the coalesced merged part (can initilize an unscheduled merge using OPTIMIZE TABLE my_table FINAL;
though it is not recommended). This process it then continued on the merged part folders, creating a cascading tree of merges (Note that this will not end in a single folder as there is a size limit enforced).
As in most DBMSs, ClickHouse logically groups tables into databases (CREATE DATABASE my_database
), with the predifined databases being defined at server initialisation: (required for ClickHouse managment processes)
Note that databases also have engines; with the default database engine being Atomic, unless using ClickHouse Cloud in which case the default database engine is Replicated.
Table creation also follows traditional DBMSs with the exception of the engine:
Aside from all the expected SQL-like data types that ClickHouse provides, it additionally provides some more unique and interesting types like:
-
Array(T) defines an array where all the elements have the same data type
define array using []
orarray()
-
Nullable(T) allows for NULL values to be used for missing values (if it is not important to the buisness logic, do not use as it increases dimentionality of table)
-
Enum(T) is used for defining enumerations
enum column can only contain values in its definition, else throws exception -
LowCardinality(T) is useful when a column has a relatively small number of unique values (hard to quantify but can be useful with <=10000 unique values); as it uses dictionary encoding to store the values as integers. Advantage over enum() as new values can be dynamically added (no need to know all unique values at creation time).
Primary keys can be defined in several ways:
- Inside column list:
- Outside column list:
- Using
ORDER BY
instead ofPRIMARY KEY
(with noPRIMARY KEY
clause)
iF both ORDER BY
and PRIMARY KEY
are defined, the primary key must be a prefix of the order by tuple:
Good candidates for primary key columns are ones which are queried on frequently, ordered by cardinality in ascending order (at some point adding a column to a primary key no longer provides any benefit).
For creating additional primary indexes consider:
- creating two tables for the same data with different primary keys.
- use projection (ClickHouse creates hidden table that stores the data sorted in a different way)
- use a materialized view, a seperate tables based on a SELECT statement.
- define a skipping index
It's logical to assume that partitioning (logical combination of records in a table by a specific criterion) data improves query performance and they can help limit the number of granules serached but partitions (in ClickHouse) are mostly for data managment. Partitions can improved the performance of mutations, moving data around, retention policies, adn so on. In general, to improve performance focu on defining a good primary key.
In most cases, partition key is not required; and in most other cases, a partition key more granular than by month is not required.
sometimes ClickHouse can infer the column names and data types (schema might not have to be defined explicitly). And sometimes ClickHouse can also figure out the format of the datafile (from the filename extension); and its compression (from its extension)
ClickHouse supports over 75 data formats (TSV, CSV, ..., 20+ formats for JSON data, ..., Protobuf, Parquet, Arrow, ...)
Table Engine has to be used for Provider-Subscriber platforms like Kafka (uses ClickPipes if using ClickHouse Cloud)
table engines in general store all the connection details, the type of file, the schema, the credentials, etc. These are not required to be entered everytime they are accessed (like a proxy) instead the files stay on the 3rd party server but can be queried as if on the ClickHouse server (when queried streams data to ClickHouse server).
PostgreSQL and MySQL have special database engines as well:
The concept of views in ClickHouse is similar to views in other DBMSs; with the contents of a view table being based on the results of a SELECT
query.
ClickHouse additionally facilitates Parameterized Views, allowing for the view definition to change based on some parameters that can be fed at query execution time.
CREATE VIEW raw_data_parametrized AS
SELECT *
FROM raw_data
WHERE (id >= {id_from:UInt32}) AND (id <= {id_to:UInt32});
clickhouse-cloud :) SELECT count() FROM raw_data_parametrized(id_from=0, id_to=50000);
SELECT count()
FROM raw_data_parametrized(id_from = 0, id_to = 50000)
Query id: 5731aae1-3e68-4e63-b57f-d50f29055744
┌─count()─┐
│ 317019 │
└─────────┘
1 row in set. Elapsed: 0.004 sec. Processed 319.49 thousand rows, 319.49 KB (76.29 million rows/s., 76.29 MB/s.)
Materialized Views in ClickHouse are INSERT
triggers that store the result of a query inside anothe rdestination table. This means that when an INSERT
happens to the source table of the SELECT
query, the query is executed on newly-inserted rows and the result is inserted into the MV table (No trigger on DELETE
, UPDATE
,etc.).
Note that ClickHouse creates a hidden table in addition to the materialized view for each MV, called .inner.{uuid}
(has to do with how MVs work in ClickHouse). Instead of having ClickHouse implicitly create .inner.{uuid}
as the hidden table; one can define an explicit table for a view, and then define a materialized view that sends its data "to" the explicit table (seperate the view from its underlying table):
-
Define the destination table
CREATE TABLE uk_price_by_town_dest ( price UInt32, date Date, streat LowCardinality(String), town LowCardinality(String), district LowCardinality(String) ) ENGINE = MergeTree ORDER BY town;
-
Define the MV using the
TO
clause "to: the destination tableCREATE MATERIALIZED VIEW uk_price_by_town_view TO uk_price_by_town_dest AS ( SELECT price, date, street, town, district FROM uk_price_paid WHERE date >= toDate('2024-02-19 12:30:00') -- pick a time in the "not too distant" future );
-
Populate the destination table with historic data
INSERT INTO uk_price_by_town_dest SELECT price, date, street, town, district FROM uk_price_paid WHERE date < toDate('2024-02-19 12:30:00')
Materialized Views on their own cannot handle running average calculations:
CREATE TABLE some_numbers
(
`id` UInt32,
`x` UInt32
)
ENGINE = MergeTree
PRIMARY KEY id
Query id: f9a828cc-52e6-493a-b4b5-222872bac207
Ok. |
CREATE TABLE agg_of_some_numbers
(
`id` UInt32,
`max_id` UInt32,
`avg_id` UInt32
)
ENGINE = MergeTree
PRIMARY KEY id
Query id: 61284097-f0b0-476c-bb78-cb7d01efc183
Ok. |
CREATE MATERIALIZED VIEW view_of_agg_of_some_numbers TO agg_of_some_numbers
AS SELECT
id,
max(x) AS max_id,
avg(x) AS avg_id
FROM some_numbers
GROUP BY id
Query id: 4c542be8-c5e9-4749-aff3-2fc5e157e5d7
Ok. |
INSERT INTO some_numbers FORMAT Values -- (1,10), (1,20), (2, 300), (2,400)
Query id: 97b410e8-967f-49aa-a68c-8e903f162306
Ok. The first insertion into the table will have the calculations work properly: |
SELECT *
FROM agg_of_some_numbers
Query id: 72cdb105-024c-4c78-aef4-cb1282fa007e
┌─id─┬─max_id─┬─avg_id─┐
1. │ 1 │ 20 │ 15 │
2. │ 2 │ 400 │ 350 │
└────┴────────┴────────┘
2 rows in set. Elapsed: 0.004 sec. |
INSERT INTO some_numbers FORMAT Values -- (1,1000), (2,20)
Query id: e038491f-a438-4373-b3c5-c2e612e306d8
Ok. Further insertions into the table will result in the calculations being performed incorrectly, treating each block as entirely different from each other; not like the running aggregations that were expected: |
SELECT *
FROM agg_of_some_numbers
Query id: 20438124-e997-480e-811e-27cb8a633e12
┌─id─┬─max_id─┬─avg_id─┐
1. │ 1 │ 1000 │ 1000 │
2. │ 2 │ 20 │ 20 │
└────┴────────┴────────┘
┌─id─┬─max_id─┬─avg_id─┐
3. │ 1 │ 20 │ 15 │
4. │ 2 │ 400 │ 350 │
└────┴────────┴────────┘
4 rows in set. Elapsed: 0.006 sec. |
The special engine, AggregatingMergeTree, is designed specifically for dealing with (running) aggregations. This is useful when repetitively running aggregation queries in ClickHouse on (relatively) slowly changing data; instead of calculating them every time from scratch, a running aggregations can be used.
AggregatingMergeTree collapses rows with the same primary key (sort order) into a single record, with the set of values of the combined rows aggregated. The columns keep track of the "state" of each set of values, with supported column types; AggregateFunction(T,U), and SimpleAggregateFunction(T,U)
- T : aggregation function to be used by the column
- U : data type to be used by the column
CREATE TABLE amt_of_some_numbers
(
`id` UInt32,
`max_column` SimpleAggregateFunction(max, UInt32),
`avg_column` AggregateFunction(avg, UInt32)
)
ENGINE = AggregatingMergeTree
PRIMARY KEY id
Query id: c79beea8-ccaa-4668-b4bd-9781f6001ac8
Ok. |
CREATE MATERIALIZED VIEW view_of_amt_of_some_numbers TO amt_of_some_numbers
AS SELECT
id,
maxSimpleState(x) AS max_column,
avgState(x) AS avg_column
FROM some_numbers
GROUP BY id
Query id: 6cef55f6-e7ef-46de-aefc-1a7774d7cf43
Ok. Must use the |
INSERT INTO some_numbers FORMAT Values -- (1,10), (1,20), (2, 300), (2,400)
Query id: c111aac6-a3f3-4438-bf5c-a5e6146a02b4
Ok. INSERT INTO some_numbers FORMAT Values -- (1,1000), (2,20)
Query id: c1b6addb-52cb-4b67-bfa2-dd11c717a7d7
Ok. Some interesting results occur now when this is used: 1) the parts have not merged yet, and 2) the avg_column is storing binary data. |
SELECT *
FROM amt_of_some_numbers
Query id: 1ce49fad-f265-4b44-8052-926d453e6748
┌─id─┬─max_column─┬─avg_column─┐
1. │ 1 │ 1000 │ �♥☺ │
2. │ 2 │ 20 │ ¶☺ │
└────┴────────────┴────────────┘
┌─id─┬─max_column─┬─avg_column─┐
3. │ 1 │ 20 │ ▲☻ │
4. │ 2 │ 400 │ �☻☻ │
└────┴────────────┴────────────┘
4 rows in set. Elapsed: 0.009 sec. |
Note that if INSERT
is done via SELECT
query, type coercion must be done using the (Simple)State
combinator/suffix functions:
INSERT INTO some_numbers
SELECT
id,
maxSimpleState(x) as max_column,
avgState(x) as avg_column
FROM numbers
GROUP BY id
These unusual results are due to ClickHouse storing an intermediate state as opposed to the final result (not be able to calculate moving aggregates from final value). Therefore, one must query the table using the appropriate aggregation functions with a Merge
combinator/suffix (for AggregateFunction
types):
SELECT
id,
max(max_column),
avgMerge(avg_column)
FROM amt_of_some_numbers
GROUP BY id
Query id: 1b0e7fea-f040-417e-80b9-02e87b89b094
┌─id─┬─max(max_column)─┬─avgMerge(avg_column)─┐
1. │ 2 │ 400 │ 240 │
2. │ 1 │ 1000 │ 343.3333333333333 │
└────┴─────────────────┴──────────────────────┘
2 rows in set. Elapsed: 0.012 sec.
The engine SummingMergeTree, similarly to AggreationgMergeTree, collapses rows with the same primary key (sort order) into a single record. But differs from AggreationgMergeTree by only summarizing the columns, maintaining the original numeric data type.
Due to this, there is no need to use the (Simple)State
combinator/suffix functions or the (Simple)AggregateFunction
data types:
CREATE TABLE prices_sum_dest
(
`town` LowCardinality(String),
`price` UInt32
)
ENGINE = SummingMergeTree
ORDER BY town Note that there might be multiple rows with the same primary key that should be aggregated, hence should always use the |
CREATE MATERIALIZED VIEW price_sum_view TO prices_sum_dest
(
`town` String,
`price` UInt32
)
AS SELECT
town,
price
FROM uk_price_paid |
Sharding provides scalability; splitting a database into multiple smaller tables, called shards, (a table has one shard by default) stored on different servers. Do not shard a table unless it is really neccessary (a lot of data can fit in a single shard), instead attempt to increase the machine's capacity (Disk space, RAM, Cores, etc.). Replication provides redundancy, with each shard consisting of >=1 replicas (containing the same data per shard), placed on different servers, so if one serveer fails, the data is still available. Each MergeTree engine has a "replicated" version, ReplicatedMergeTree (or SharedMergeTree if in the cloud).
Replication also requires ClickHouse Keeper (centralized service for reliable distributed coordination similar to Apache ZooKeeper) which tracks the state of each replicas to keep them in sync, typically run on a seperate machine (can also be executed within clickhouse-server
processes).
Server Hosts (server) are the hardware (cloud/on-premises) that makes up the machines (more CPU cores improves data ingestion speeds, faster disks improve query performance, more memory improves data with high cardinality - where more sorting is needed); whereas Database Hosts (host) are the running instances of ClickHouse (multiple hosts can run in a single server). A Cluster, not to be confused with the physical cluster of servers, is a user-defined logical collection of >=1 shards (which consists of replicas)
|
Each clickhouse host/instance contains a single replica of a single shard. To have a single physical host contain multiple clickhouse instances, virtualization (docker, etc.) must be used. |
|
|
While the data is then replicated across the replicas (automatically), the shards are disconnected from one another and so in order to access the whole database (split between shards), a table with a Distributed engine must be created (using the schema of the distributed table). Note that this table only needs to be created on one of the hosts.
Querying this table forwards the command to one replica of each shard, with each replica processing its part of the data, sending back its results to be combined to determine the final result. Note that inserting data into the local table (not Distributed) will replicate automatically on all replicas of the shard but will not load balance the request across the shards (for this must insert into Distributed table)
There are two methods of creating a distributed MV:
- Define the MV locally on each replica, leading to the MVs processing locally with no added network overhead; with a distributed table being defined for easy querying of the multiple view tables.
- Define a distributed MV of a distributed table (can specify sharding key), leading to any activity on the distributed table propagating to the MV
ClickHouse Cloud uses a cloud-native replacement to ReplicatedMergeTree , called SharedMergeTree (automatically convertes user-defined engines to its SharedMergeTree type); which works with shared storage (S3, GCS, etc.) and thereby does not require sharding (every table has 1 shard), provides a greater seperation of compute and storage, and faster replication, mutation, and merges.
https://clickhouse.com/docs/en/guides/joining-tables
Clickhouse supports all SQL JOINs, the difference comes from how ClickHouse handles the right vs left tables in the join. Joining large datasets require a lot of memory and CPU power; and so to ensure maximum utilization of resources, ClickHouse has 6 different join algorithms:
Default is direct but if Dictionary/Join table engine are not implemented, hash is defaulted to (memory bound and so can fail)
SELECT *
FROM actor AS a
JOIN roles AS r ON a.id = r.actor_id
SETTINGS join_algorithm = 'hash'
A hash table is built in memory from the right-hand table; with 'hash' building a single hash table, 'parallel_hash' buckets the data and builds several hash tables ('grace_hash' is similar to 'parallel' but limits the memory usage). The data in the right-hand table is streamed (in parallel) into memory with the data in the left-hand table streamed and joined by doing lookups into the hash table.
Note that the table must fit in memory; if not, an exception occurs and the JOIN fails (put the smaller table on the right side of the JOIN)
These join algorithms require the data to be sorted first, with 'full_sorting_merge' requiring both tables to be sorted before joining (classical sort-merge) while 'partial_merge' requires the right-hand table to be sorted before joining. To achieve the best performance for these algorithms; tables should be sorted (ORDER BY
) on the attributes used in their join statement.
Note that sorting takes place in memory if possible (otherwise spills to disk). Also, 'full_sorting_merge' can have similar performances to 'hash' but uses much less memory.
A Dictionary is a special type of key-value "table" (typically) stored in memory, tied to a source (rows/mappings come from another place like local file, executable file, http(s), etc.), periodically updated.
CREATE DICTIONARY uk_mortgage_rates
(
`date` DateTime64,
`variable` Decimal32(2),
`fixed` Decimal32(2),
`bank` Decimal32(2)
)
PRIMARY KEY date
SOURCE(HTTP(URL 'https://learnclickhouse.s3.us-east-2.amazonaws.com/datasets/mortgage_rates.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 2628000000)
LAYOUT(COMPLEX_KEY_HASHED())
SETTINGS(date_time_input_format = 'best_effort')
once the dictionary is set, querying it is done using dictGet(<dictionary_name>, <attribute_name>, <value_of_key>)
, thereby performing similar to a join function.
If the updating mechanism is not required, Join table engine (right-hand table is sorted in memory) can be used. A table is specified with Engine = Join(join_strictness, join_type, keys)
, where join_strictness and join_type allow ClickHouse to take advantage of the user's knowledge about how the table will be joined to optimise execution time and memory usage.
An EmbeddedRocksDB table engine (right-hand table is a rocksdb table) https://clickhouse.com/docs/en/engines/table-engines/integrations/embedded-rocksdb https://rocksdb.org/. By being a RocksDB table, A special direct join with EmbeddedRocksDB tables is supported; which avoids forming a hash table in memory and accesses the data directly from the database.
Parts in ClickHouse are immutable files and so updates/deletions can only occur when parts merge (heavy-weight operation). These require the ALTER TABLE
command (ALTER TABLE random UPDATE y = 'hello' WHERE x > 10;
, etc. note however that updates cannot occur on a primary key column), called a mutation, which does not execute immediately (like an event) but is kept (inside system.mutations
) until the next merge where it is realized (can have the client wait until the mutation is realized by setting mutation_sync = 1 || 2
).
Mutations execute in the order they were created, and each part is processed in that order (data inserted after mutation is not mutated); and if a mutation gets stuck, it can be stopped using KILL MUTATION
. Mutations in replicated tables are handled by ZooKeeper.
Only available in ClickHouse Cloud, Lightweight Mutations leverage hidden columns to dynamically apply the certain mutations so their effects take place immediately (no need to wait for next merge).
Lightweight Delete uses DELTE FROM my_table WHERE y != 'hello';
,a different syntax to that of a mutation (more a lightweight operation than a mutation). This marks the deleted rows using a special hidden column with queries automatically rewritten to exclude them (eventually deleted during the next merge).
Lightweight Update use the same syntax as mutation updates but require setting SET apply_mutation_on_fly = 1;
, causing the rows to appear to have updated immediately (haven't but will in the next merge). Frequent lightweight updates can have a negative impact on performance.
When data is updated frequently, it is not performally feasible to use (lightweight) mutations and instead; a combination of upserting/re-inserting data with special table engines (designed for deleting duplciated records) can be used: (work on both ClickHouse Cloud and open source)
While these will have the correct internal logic; similar to the aforementioned options, the data does not trully update until the next merge is initiated (queries will return unexpected results). To fix this, FROM my_table FINAL
is used to indicate to ClickHouse that the proper representation of the data is desired.
- CollapsingMergeTree(T) tables must have a
sign Int8
attribute on which it collapses.sign = 1
means that the row is a state of an object; whereassign = -1
means the cancellation of the state of an object with the same attributes. These then act similar but not entirely equal to replace and delete operations. Note that this engine allows only strictly consecutive insertions. - VersionedCollapsingMergeTree(T,U) works similar to CollapsingMergeTree but allows for out of order insertions (using multiple threads) by having an additional
version
attribute (commonlyTimeStamp
). - ReplacingMergeTree similar to VersionedCollapsingMergeTree can have an optional
version
attribute to prevent racing conditions.
ClickHouse compresses MergeTree data automaically before writing it to storage (default is LZ4). Compression can be set globally (via config.xml
files) or at the column-level via CODEC(compression1[, compression2, ...])
(compression pipeline where data -> compression1 -> compression2 -> ... -> compressed-column
) suffix to the column definitions: https://clickhouse.com/docs/en/sql-reference/statements/create/table#column_compression_codec
There are further specialized codecs which instead of compress the data; encrypt on disk it instead. These are only available when an encryption key is specified by encryption settings (within config.xml
file). Encryption should only be at the end of codec pipelines, as encrypted data usually cannot be compressed in any meaningful way (evenly distributed/far-apart, high-valued, etc.).
AES-128-GCM-SIV
, or AES-256-GCM-SIV
will encrypt the data with AES-(128/256) in (RFC 8452, https://datatracker.ietf.org/doc/html/rfc8452, for 128 version) GCM-SIV mode.
These codecs use a fixed nonce and encryption is therefore deterministic. This makes it compatible with deduplicating engines such as ReplicatedMergeTree. However, when the same data block is encrypted twice, the resulting ciphertext will be exactly the same so an adversary who can read the disk can see this equivalence (although only the equivalence, without getting its content).
Notes to keep in mind:
- Most engines including the "*MergeTree" family create index files on disk without applying codecs. This means plaintext will appear on disk if an encrypted column is indexed.
- performing a
SELECT
query mentioning a specific value in an encrypted column (such as in its WHERE clause), the value may appear in system.query_log (disable the logging to prevent unintended disclosure).
A lifetime, TTL, can be configured for a MergeTree tables and/or columns by using the TTL inverval [TO [VOLUME|DISK] name] [, inverval ..., ...]
clause to define a time interval. By default, a TTL on a table causes the affected rows to be deleted; however, it can be configured so parts will be moved, compressed, or aggregated based on a TTL criteria (if all rows in part satisfy the TTL).
This can be used to create a hot/cold architecture within ClickHouse; where data is continuously moved into higher-compression, slower retrieval disks/volumes as it gets older:
CREATE TABLE my_table(
id Int,
x Decimal64(2),
y Decimal32(2),
sum_x Decimal256(2),
max_y Decimal132(2),
timestamp DateTime -- set to now() on inserts
)ENGINE = MergeTree
ORDER BY id
TTL timestamp TO VOLUME 'hot', -- has to depend on a column of the table.
timestamp + INTERVAL 1 HOUR TO VOLUME 'warm',
timestamp + INTERVAL 1 DAY TO VOLUME 'cold', -- move data to colder volumes as they become older
timestamp + INTERVAL 1 WEEK TO VOLUME 'frozen',
timestamp + INTERVAL 1 WEEK GROUP BY id SET sum_x = sum(x), max_y = max(y), -- Rollup: contain GROUP BY ... SET that uses aggregate functions on columns
timestamp + INTERVAL 1 MONTH -- will delete part where all rows are older than a month
Note that volumes and disks must be defined in the storage policies of the ClickHouse server (inside config.d
) https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#table_engine-mergetree-multiple-volumes_configure.
TTL rules can be appended into existing table/column using ALTER TABLE my_table MODIFY COLUMN name type TTL ...
; and to apply all TTL rules to existing rows can be done using ALTER TABLE my_table MATERIALIZE TTL
. Note that TTL actions will occur in next merge (to happen immediately must use OPTIMIZE TABLE my_table FINAL
).
https://clickhouse.com/docs/en/sql-reference/statements/alter/projection
Projections are additional (hidden) per part/partition table(s), automatically kept in sync with the original table (similar to a materialized view for a part or partition), with ClickHouse deciding at query time whether to use or not to reduce the number of rows scanned. This is beneficial when running queries on non-primary-key columns or running total aggregates.
ALTER TABLE uk_price_paid
ADD PROJECTION uk_price_paid_projection_1 (
SELECT town, price, date, street, locality -- can pick all columns or just the neccessary for a commonly used query
ORDER BY town -- sort the data in the projection by town
);
-- note that this adds a mutation to the table
ALTER TABLE uk_price_paid
MATERIALIZE PROJECTION uk_price_paid_projection_1; -- apply given projection (mutation) on past data
ALTER TABLE uk_price_paid
ADD PROJECTION uk_price_paid_projection_2 (
SELECT town, max(price) -- aggregate is stored for each part
GROUP BY town -- aggregated by town
);
When a projection is made, an anynomous *MergeTree table is created and stored in a subfolder of the part folder. This adds overheads for storage, inserts, merges, etc. (similar to materialized view overhead) but substaintially increase retrieval speeds.
https://clickhouse.com/docs/en/optimize/skipping-indexes#skip-index-types
Skipping Index is an additional index that a MergeTree table can define; which do not store the data twice (unlike materialized views or projections) or sort the data differently. Instead, they are built off the existing sort order and the existing granules (based on the data in blocks = >=1 granule ), where ClickHouse can use this additional index to identify whether the block has requested data or not, hence skipping large chunks of data:
ALTER TABLE uk_price_paid
ADD INDEX uk_price_paid_index_1 town -- attribute to apply the index on
TYPE minmax -- index type
GRANULARITY 1; -- the block size (n x granule_size)
-- note that this adds a mutation to the table
ALTER TABLE uk_price_paid
MATERIALIZE INDEX uk_price_paid_index_1; -- apply given index (mutation) on past data queries can be forced to use the skipping index through |
minmax
stores the minimum and maximum values of the index expression for each blockset
accepts a single parameter of the maximum size of the value set per block (0 permits an unlimited number of discrete values), and stores a set containing all unique values in the block.
A Bloom filter is a space-efficient probabilistic data structure for testing set membership (is 'a' in ('1','b','c')) at the cost of a slight chance of false positives (False or Probably True). Because Bloom filters can more efficiently handle testing for a large number of discrete values, they can be appropriate for conditional expressions that produce more values to test. In particular, a Bloom filter index can be applied to arrays, where every value of the array is tested, and to maps, by converting either the keys or values to an array using the mapKeys or mapValues function. There are three Data Skipping Index types based on Bloom filter:
-
bloom_filter
takes a single optional parameter of the allowed "false positive" rate between 0 and 1 (if unspecified, .025 is used). -
tokenbf_v1
takes three parameters, all related to tuning the bloom filter used:- the size of the filter in bytes (larger filters have fewer false positives, at some cost in storage).
- number of hash functions applied (again, more hash filters reduce false positives).
- the seed for the bloom filter hash functions.
calculator https://hur.st/bloomfilter/ can be used from more details. This index works only with String, FixedString, and Map datatypes. The input expression is split into character sequences separated by non-alphanumeric characters (a column value of
This is a candidate for a "full text" search
will contain the tokensThis
is
a
candidate
for
full
text
search
). It is intended for use inLIKE
,EQUALS
,IN
,hasToken()
and similar searches for words and other values within longer strings. -
ngrambf_v1
functions the same as thetokenbf_v1
, but takes one additional parameter before the Bloom filter settings; the size of the ngrams (character string of lengthn
) to index instead oftokenbf_v1
's character sequences separated by non-alphanumeric characters (stringA_short_string
with an ngram size of 4 would be indexed asA_sh
_sho
shor
hort
ort_
rt_s
t_st
_str
stri
trin
ring
).
-
Too many parts: Each part represents a folder of files on the filesystem (max value is set by
max_parts_in_total
, default is 100000). The most common causes of "too many parts" error:- too many partitions (poor user design) - only partition by month (or day if have a lot of data)
- too many small inserts - enable async inserts (
SETTINGS async_insert=1
) - too many materialized views - try consolidating aggregates into a smaller number of views
-
Scaling horizontally too early: ClickHouse will try to use all of a machine's resources; often deployed on servers with hundreds of cores, terabytes of RAM, and petabytes of disk space. 2 machines are for redundancy but go vertical before going horizontal.
-
Using frequent mutations (
ALTER TABLE
queries, etc.): Mutations can cause a backlog of merges which lead to too many parts, replication delays, frequent CPU and IO-intensive merges. Use deduplication strategies instead. -
Unneccessary use of complex types: It is recommended to use primitive types where possible (offer the best insertion and query time performance) and use the more complex data types only when neccessary (
Nested
,Tuple
,Map
,JSON
, etc.). Avoid using Nullable (only use when needed in the buisness logic). -
Choosing a poor primary key: Select columns for which you will often filter on (typically no more than 2-3), and order the columns in a primary key by their cardinality in ascending order.
-
Overuse of data skipping indices: Common to have skipping indexes that simply complicate table design, slow insert performance, and rarely improve query performance. Skip indices should only be considered once other alternatives have been exhausted.
-
Thinking
LIMIT
will speed up a query: It might speed up the query but typically it does not (aggregates usually need to read all rows anyway). IfGROUP BY
the primary key, then try settingoptimize_aggregation_in_order=1
with aLIMIT
; then it will shortcut the query. -
Issues relating to Materialized Views: ensure it is understood how they work; only trigger on
INSERT
, have no visibility of merges, partition drop, or mutations,ORDER BY
clauses of the target table must be consistent with theGROUP BY
of theSELECT
clause, the column names of the materialized view'sSELECT
must match those of the destination table. too many materialized views to a single table. Many of-State
functions (especiallyquantile
states) can be CPU intensive and lead to slow inserts.
- Use dictionaries with
JOIN
s: No matter withJOIN
technique is used, an attempt is made to load the right-hand table into memory. Dictionaries are already in memory (if possible), and so whenever possible use a dictionary for the right hand side of allJOIN
s. - Avoid
Float*
when possible: If possible, use integer types (more accurate, much faster to process, etc.) by storing 2 columns (Dollars, cents).If must store floating-point values, useDecimal
data types (maintains accuracy). - Understand how projections work: Make sure the benefits of a projection is worth the tradeoff (extra storage and computation vs. the gain in query performance).
UUID
andIP
addresses: Do not store these asString
data types, use the dedicatedUUID
andIP
data types instead (more efficient and faster).- Prefer
LowCardinality
data types toEnum
:Enum
s are great, but to change them requires altering the table (DDL change) whereasLowCardinality
has a similar performance benefit but can easily add new values to the column. - Flatten data: Even better than joining data at query time, join teh data at insertion time.
- Use materialized views: Instead of computing values with each query, define a materialized view (computed at insertion). Define multiple MVs fro a single table (or create views of views).
- Don't overuse regular views: sinple stored queries and cannot take advantage of some of the query optimizations that ClickHouse performs on queries.
- View the query details: Use the
EXPLAIN
statement to view the execution plan of a query. - The read-only table error: "Table is readonly mode" almost always means the ClickHouse Keeper (or ZooKeeper) is not running properly (or unreachable)
- ClickHouse not starting up properly: Check
/metadata
folder containing a collection of.sql
files that are executed at startup. Delete a file if it is causing an issue (if comfortable with losing that metadata). Useful if want to delete a table without having to startup the server.
Continuous ingestion into ClickHouse from S3 (https://clickhouse.com/videos/loading-s3-data-into-clickhouse) can be achieved by having a materialized view read (new) data coming in from an S3 bucket (new files) through an S3Queue (https://clickhouse.com/docs/en/engines/table-engines/integrations/s3queue) table; forming blocks of rows that it will then insert into the final table.
The S3Queue table engine provides integration with Amazon S3 ecosystem (s3-specific features) and allows streaming import (similar to Kafka engine). In order to keep track of processed files (is then able to distinguished between processed and new data inside S3), it requires one or more ClickHouse Keeper (or ZooKeeper) nodes (cannot work otherwise); the simplest solution is to have a single combined ClickHouse Keeper and Database Host node: (see example below and Sharding and Replication section)
ClickHouse Keeper (or ZooKeeper) config files: <clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>chnode1</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse> <clickhouse>
<zookeeper>
<node index="1">
<host>chnode1</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse> </clickhouse>
<macros>
<shard>1</shard>
<replica>replica_1</replica>
</macros>
</clickhouse> |
ClickHouse Database Host config files: <clickhouse>
<logger>
<level>debug</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>3</count>
</logger>
<display_name>clickhouse</display_name>
<listen_host>0.0.0.0</listen_host>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<interserver_http_port>9009</interserver_http_port>
</clickhouse> </clickhouse>
<remote_servers replace="true">
<cluster_2S_1R>
<secret>mysecretphrase</secret>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>chnode1</host>
<port>9000</port>
</replica>
</shard>
</cluster_2S_1R>
</remote_servers>
</clickhouse> <!-- can specify a "named collection" called "s3queue_conf" -->
<clickhouse>
<named_collections>
<s3queue_conf>
<url>https://clickhouse-s3-bucket.s3.us-east-1.amazonaws.com/*.csv</url>
<access_key_id>ACCESS KEY<access_key_id>
<secret_access_key>KEY SECRET</secret_access_key>
</s3queue_conf>
</named_collections>
</clickhouse> |
After initiallising the ClickHouse instance and its Keeper, the following procedure needs to be executed to create an s3queue (ingest all new files uploaded to s3 bucket), table (final destination of data), and a materialized view (consume data in s3queue and insert it to table): CREATE TABLE my_table_s3queue
(
column1 FixedString(1),
column2 UInt32,
column3 String
)
ENGINE = S3Queue('https://clickhouse-s3-bucket.s3.us-east-1.amazonaws.com/*.csv', 'ACCESS KEY', 'KEY SECRET', 'CSVWithNames')
-- or ENGINE = S3Queue(s3queue_conf, format = 'CSVWithNames')
SETTINGS
mode = 'ordered'; -- assumes files' names in bucket are in chronological order, keeps track of "latest" name
-- or mode = 'unordered'; -- keeps track of all processed files |
CREATE TABLE my_table
(
column1 FixedString(1),
column2 UInt32,
column3 String
)
ENGINE = MergeTree
ORDER BY (column1, column2) CREATE MATERIALIZED VIEW my_table_materialized_view TO my_table_s3queue AS
SELECT *
FROM my_table_s3queue; |
The materialized view reads the data coming in from the s3 bucket (through s3queue) and forms blocks of rows that it will then insert into the table. It flushes the rows into the table based on the min insert block size configs (SHOW SETTINGS LIKE 'min_insert_block_size%'
).:
This code snippet show how to creates a view that will calculate the data used when (un)compressed by the tables defined (regex or explicit):
CREATE VIEW memory_usage_per_tables AS (
SELECT
table,
formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
count() AS num_of_active_parts
FROM system.parts
WHERE (active = 1) AND (arrayExists(re -> match(table, re), {table_names:Array(String)}))
GROUP BY table
);
SELECT *
FROM memory_usage_per_tables(table_names = ['uk_prices_*'])
Query id: d8763791-c10d-42bd-afd2-d2ffd4a39a92
┌─table────────────┬─compressed_size─┬─uncompressed_size─┬─num_of_active_parts─┐
1. │ uk_prices_hf │ 7.74 GiB │ 14.41 GiB │ 14 │
2. │ uk_prices_daily │ 2.34 MiB │ 104.59 MiB │ 5 │
3. │ uk_prices_yearly │ 29.61 KiB │ 61.00 KiB │ 1 │
└──────────────────┴─────────────────┴───────────────────┴─────────────────────┘
3 rows in set. Elapsed: 0.020 sec.