-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ClickHouse as a storage backend #1438
Comments
Is anyone working on this? I or my team can maybe give a shot at this. |
as i now noone working on this |
@bzon i can join you as tester |
Sure! |
Happy to see someone working on this :) |
@bzon would be good if you post an architecture here, specifically how you would lay out the data, ingestion, etc. To my knowledge, clickhouse requires batched writes, and it may even be up to you to decide which node to send the writes to, so there are many questions. It may require some benchmarking to find the optimal design. |
@yurishkuro at the moment, we have zero knowledge with the internals of jaeger. I think that benchmarking should be the first step to see if this integration is feasible. And with that said, the first requirement should be creating the right table schema. |
I think architecture of project https://github.com/flant/loghouse could be a source of inspiration… |
This webinar could be interesting: A Practical Introduction to Handling Log Data in ClickHouse |
I took a stab at it (very early WIP): This is the schema I used:
CREATE TABLE jaeger_index (
timestamp DateTime64(6),
traceID FixedString(16),
service LowCardinality(String),
operation LowCardinality(String),
durationUs UInt64,
tags Nested(
key LowCardinality(String),
valueString LowCardinality(String),
valueBool UInt8,
valueInt Int64,
valueFloat Float64
),
INDEX tags_strings (tags.key, tags.valueString) TYPE set(0) GRANULARITY 64,
INDEX tags_ints (tags.key, tags.valueInt) TYPE set(0) GRANULARITY 64
) ENGINE MergeTree() PARTITION BY toDate(timestamp) ORDER BY (timestamp, service, operation);
CREATE TABLE jaeger_spans (
timestamp DateTime64(6),
traceID FixedString(16),
model String
) ENGINE MergeTree() PARTITION BY toDate(timestamp) ORDER BY traceID; You probably need Clickhouse 20.x for Index table looks like this: SELECT *
FROM jaeger_index
ARRAY JOIN tags
ORDER BY timestamp DESC
LIMIT 20
FORMAT PrettyCompactMonoBlock
Tags are stored in their original types, so with enough SQL-fu you can find all spans with response size between X and Y bytes, for example. The layout of the query is different from Elasticsearch, since now you have all tags for the trace laid out on a single view. This means that if you search for all operations of some service, you will get a cross-span result where one tag can match one span, and another tag can match another span. Consider the following trace:
You can search for There's support for both JSON and Protobuf storage. The former allows out-of-band queries, since Clickhouse supports JSON functions. The latter is much more compact. I pushed 100K spans from tracegen through this with a local Clickhouse in a Docker container with stock settings, and here's how storage looks like: SELECT
table,
sum(marks) AS marks,
sum(rows) AS rows,
sum(bytes_on_disk) AS bytes_on_disk,
sum(data_compressed_bytes) AS data_compressed_bytes,
sum(data_uncompressed_bytes) AS data_uncompressed_bytes,
toDecimal64(data_uncompressed_bytes / data_compressed_bytes, 2) AS compression_ratio,
toDecimal64(data_compressed_bytes / rows, 2) AS compressed_bytes_per_row
FROM system.parts
WHERE table LIKE 'jaeger_%'
GROUP BY table
ORDER BY table ASC
SELECT
table,
column,
type,
sum(column_data_compressed_bytes) AS compressed,
sum(column_data_uncompressed_bytes) AS uncompressed,
toDecimal64(uncompressed / compressed, 2) AS compression_ratio,
sum(rows) AS rows,
toDecimal64(compressed / rows, 2) AS bytes_per_row
FROM system.parts_columns
WHERE (table LIKE 'jaeger_%') AND active
GROUP BY
table,
column,
type
ORDER BY
table ASC,
column ASC
We have around 74B daily docs in our production Elasticsearch storage. My plan is to switch that to fields-as-tags, remove indexing of non-queried fields (logs, nested tags, references), then switch to a sorted index and then see how Clickhouse compares to that for the same spans. |
@bobrik very interesting, thanks for sharing. I am curious what the performance for retrieving by trace ID would be like. Q: why do you use LowCardinality(String) for tags? Some tags can be very high cardinality, e.g. URLs.
I'm confused why this would be the case. Doesn't CH evaluate the query in full against each row (i.e. each span)? Or is this because how your plugin interacts with CH? |
@yurishkuro even 100k per block cardinality, LowCardinality(String) with dictionary based encoding will better than just String |
@yurishkuro retrieving by trace ID is pretty very fast, since you're doing a primary key lookup. ClickHouse/ClickHouse#4074 (comment) says this about
That said, the schema is in no way final.
The row is not span, it's span-key-value combination. That's the key. Take a look at the output of this query, which is an equivalent of what I do: SELECT *
FROM jaeger_index
ARRAY JOIN tags
ORDER BY timestamp DESC
LIMIT 20
FORMAT PrettyCompactMonoBlock
Here we joint traceID with nested tags, repeating every tag key value with corresponding trace. Operation Compare this to how it looks like at span level (this is what Elasticsearch works with): SELECT *
FROM jaeger_index
WHERE (traceID = '3adb641936b21d98') AND (service = 'jaeger-query') AND (operation = 'getTraces')
Clickhouse doesn't allow direct lookups against nested fields like |
I think I might be more confused about tag queries that I initially realized, please take my explanation with a big grain of salt. |
Turns out that having a sparse I've tried combining span storage and indexing into one table, but that proved to be very detrimental to performance of lookup by trace id. It compressed somewhat better, though. If anyone wants to give it a spin, please be my guest: It feels a lot snappier than Elasticsearch, and it's at least 2.5x more compact (4.4x if you compare to Elasticsearch behaviour out of the box). |
@bobrik What stops Pull Request? |
There were some issue with ClickHouse that prevented me from getting reasonable latency for the amount of data we have. This is the main one: See also:
There are also quite a few local changes that I haven't pushed yet. I'll see if I can carve out some more time for this to push the latest changes, but no promises on ETA. |
Another question. Maybe it makes sense to divide the settings of tables
This will take the load off Jaeger when it collects the batches (no need to make large batches on high load). |
It seems that all the necessary changes have been made in the latest versions of ClickHouse. |
Yes and no. The code is pretty solid and it's been running on a single node Clickhouse for the last month, but I'm waiting on an internal Cluster provisioning to finish to test it more broadly with real humans making queries. My plan is to add more tests and make a PR upstream after that. The latest code here: |
@bobrik Impressive. I can test this on my clickhouse cluster. A few question:
|
First, here's how stock Elasticsearch backend compares to our modified one (2x replication in both cases):
We don't use nested docs and sort the index, which:
As you can see, this was back in May. Now we can compare improved Elasticsearch to Clickhouse:
SELECT
table,
partition,
sum(marks) AS marks,
sum(rows) AS rows,
formatReadableSize(sum(data_compressed_bytes)) AS compressed_bytes,
formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_bytes,
toDecimal64(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS compression_ratio,
formatReadableSize(sum(data_compressed_bytes) / rows) AS bytes_per_row,
formatReadableSize(sum(primary_key_bytes_in_memory)) AS pk_in_memory
FROM system.parts
WHERE (table IN ('jaeger_index_v2', 'jaeger_spans_v2', 'jaeger_archive_spans_v2', '.inner.jaeger_operations_v2'))
AND active
AND partition = '2020-08-28'
GROUP BY table, partition
ORDER BY table ASC, partition ASC ┌─table───────────┬─partition──┬────marks─┬────────rows─┬─compressed_bytes─┬─uncompressed_bytes─┬─compression_ratio─┬─bytes_per_row─┬─pk_in_memory─┐
│ jaeger_index_v2 │ 2020-08-28 │ 13401691 │ 13723301235 │ 294.31 GiB │ 2.75 TiB │ 9.56 │ 23.03 B │ 115.04 MiB │
│ jaeger_spans_v2 │ 2020-08-28 │ 13401693 │ 13723301235 │ 757.75 GiB │ 4.55 TiB │ 6.14 │ 59.29 B │ 358.88 MiB │
└─────────────────┴────────────┴──────────┴─────────────┴──────────────────┴────────────────────┴───────────────────┴───────────────┴──────────────┘
Set
No, there are multitudes of ways to organize tables and you may want to tweak table settings yourself, that's why I only provide an example starting schema. This is also part of the reason I want to test on our production cluster rather than on a single node before I make a PR. Please consult the docs: https://github.com/bobrik/jaeger/tree/ivan/clickhouse/plugin/storage/clickhouse#schema Keep in mind that you need at least Clickhouse v20.7.1.4189 to have reasonable performance. |
it's very promising ! |
The first and second proxy is not suitable because both use the HTTP protocol. |
hooo your backend use the native tcp protocol of clickhouse ! yes you use: https://github.com/ClickHouse/clickhouse-go that use native protocol ! |
@jkowall why you think ClickHouse doesn't have scale-out? It near-linear scalability available in ClickHouse itself by design just add shard with hosts to after it you can insert into any host and data will spreaded via sharding key, or you can use chproxy \ haproxy to data ingestion directly into MergeTree table also you can read from all servers with smarter aggregation from Distributed table |
@Slach yes I saw that, but if you have hundreds of nodes it can be an issue I would assume. We run thousands of nodes of ES. Bigger issue would be supporting ClickHouse in Kibana, but that's for another project :) |
@mcspring I'm not actively working on it at the moment, since it doesn't require any maintenance from me. Feel free to submit a PR and I'll get to it eventually. |
@jpkrohling is this still the case?
I tried finding an issue for this, but could not find it. Is there an issue for this? Thanks! |
@otisg don't know about the issue, but the approach is that we're rebuilding Jaeger backend on top of OpenTelemetry collector, which has a mechanism of combining various packages (even from different repos) at build time without explicit code dependencies in the core. That means the storage implementations can directly implement storage interface and don't have to go through much less efficient grpc-plugin interface. |
I don't think we have an issue tracking this specific feature, but we know it will be there for v2, which is being tracked here: https://github.com/jaegertracing/jaeger/milestone/15 |
Based on personal experience, i would like to recommend adding clickhouse as one of core storage options for jaeger (like elasticsearch and cassandra), if not, we should probably port Ivan's work as a gRPC plugin. There are few benefits I can think of:
Uber's migration of their logging pipeline from ES to clickhouse[0] is a very good example of clickhouse performance. |
I also like the idea of adding a plugin as one of the core storage options for jaeger. We have been using Ivan's plugin for half a year. The use of a clickhouse as storage is almost invisible in terms of infrastructure resources. Docker images with an updated version of jaeger and a startup example can be found here |
I have migrated https://github.com/bobrik/jaeger/tree/ivan/clickhouse/plugin/storage/clickhouse to storage plugin. The source code is available here https://github.com/pavolloffay/jaeger-clickhouse. For now I am not planning on maintaining it, I have built it for our internal purposes. However if there is interest and somebody would like to help to maintain it I am happy to chat about it. The repository contains instructions how to run it and the code was tested locally (see readme). When I run the code on more data I get |
@pavolloffay thanks a lot for your great efforts ;) I hope you will bring ClickHouse to first-class citizen storage in Jaeger |
Folks, did you have to increase maximum nuber of queries in ClickHouse when running Jaeger?
I am getting @bobrik / @Slach how does your Jaeger deployment look like? Are you using Kafka as well? |
All ingress in our deployment comes from Kafka with large batches (10k to 100k). We have 512 as the concurrent query limit, but the metrics show under 20 concurrent queries per node in a 3 node cluster. |
@bobrik Could you please tell me, how did you use jaeger-tracegen for this, 'cause I can't figure out how to push generated spans neither to Jaeger nor to Clickhouse. |
Are you having troubles running tracegen itself? It should send data via UDP to a local agent, which would be connected to a remote collector (unless using a local all-in-one). |
I made it now, thank you! |
@pavolloffay Here is an example of tables with buffers. |
Folks, I have moved the clickhouse-plugin to jaegertracing org - https://github.com/jaegertracing/jaeger-clickhouse Note that support only depends on the community. It's not officially supported. However the project is in good state thanks to @EinKrebs excellent work. Please move any further discussion to that repository. |
Also note that the last version of Jaeger operator supports storage grpc-plugin. |
The code I copied is also set SPAN_STORAGE_TYPE=clickhouse , but it still reports an error. Use "jaeger-ingester [command] --help" for more information about a command. |
Can you tell exactly which code? |
@EinKrebs @pavolloffay |
I don't know about jaeger-clickhouse, I personally did not maintain it for 2 years now. |
@pavolloffay Is this project still being maintained? |
@yurishkuro Is there any replacement for grpc-plugin now for using clickhouse as a backend or is it possible to somehow configure query to work with clickhouse? |
@selfing12 maybe this discussion can help you |
Yes, as I understand it, this is the last version in which this bundle works, what to do with 1.60+ |
ClickHouse, an open-source column-oriented DBMS designed initially for real-time analytics and mostly write-once/read-many big-data use cases, can be used as a very efficient log and trace storage.
Meta issue: #638 Additional storage backends
The text was updated successfully, but these errors were encountered: