This document covers the answers to the following questions:
- How data is stored on disk for MergeTree engine family tables
- What are
parts,granulesandmarks - How and why choosing the correct
ORDER BYandPARTITION BYin table definitions affects query performance - How to use
EXPLAINto understand what ClickHouse is doing - Difference between
PREWHEREandWHERE - Data compression
Introduction to MergeTree
Why is ClickHouse so fast? states:
ClickHouse was initially built as a prototype to do just a single task well: to filter and aggregate data as fast as possible.
Rather than force all possible tasks to be solved by singular tools, ClickHouse provides specialized "engines" that each solve specific problems.
MergeTree engine family tables are intended for ingesting large amounts of data, storing that data efficiently, and running analytical queries on it.
How MergeTree stores data
Consider the following (simplified) table for storing sensor events:
CREATE TABLE sensor_values (timestamp DateTime,site_id UInt32,event VARCHAR,uuid UUID,metric_value Int32)ENGINE = MergeTree()ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)SETTINGS index_granularity = 8192
Data for this table would be stored in parts, each part a separate directory on disk. Data for a given part is always sorted by the order set in ORDER BY statement and compressed.
Parts can be Wide or Compact depending on its size. We'll be mostly dealing with Wide parts as part of day-to-day operations.
Wide parts are large and store each column in a separate binary data file, which are sorted and compressed.
ClickHouse also stores a sparse index for the part. A collection of rows with size equal to the index_granularity setting is called a granule. For every granule, the primary
index stores a mark containing the value of the ORDER BY statement as well as a pointer to where that mark is located in each data file.
💡 For better performance when running queries, it is not recommended to set
index_granularitytoo low. The default value for engines in theMergeTreefamily is 8192. An implication of this is that accessing data by primary key (in this case theORDER BYclause is equivalent to the primary key) will not read just one row, but rather up toindex_granularitynumber of rows. This is acceptable given ClickHouse is meant to perform well with aggregations, rather than point lookups.
Diving deeper into data-on-disk for a Wide part
This assumes you're using a docker-based ClickHouse installation and have clickhouse-client running
Seeding data
INSERT INTO sensor_valuesSELECT *FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)LIMIT 200000000
Looking at part data
system.parts table contains a lot of metadata about every part.
To find out what type each part is, its size, and where on disk it's located, you can run the following query:
SELECTname,part_type,rows,marks,formatReadableSize(bytes_on_disk),formatReadableSize(data_compressed_bytes),formatReadableSize(data_uncompressed_bytes),formatReadableSize(marks_bytes),pathFROM system.partsWHERE active and table = 'sensor_values'FORMAT Vertical
The result might look something like this:
Row 1:──────name: all_12_17_1part_type: Widerows: 6291270marks: 769formatReadableSize(bytes_on_disk): 476.07 MiBformatReadableSize(data_compressed_bytes): 475.92 MiBformatReadableSize(data_uncompressed_bytes): 474.00 MiBformatReadableSize(marks_bytes): 90.12 KiBpath: /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
Inspecting data on disk
⟩ docker exec -it posthog_clickhouse_1 ls -lhS /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/total 477M-rw-r----- 1 clickhouse clickhouse 308M Nov 2 07:33 event.bin-rw-r----- 1 clickhouse clickhouse 97M Nov 2 07:33 uuid.bin-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 metric_value.bin-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 timestamp.bin-rw-r----- 1 clickhouse clickhouse 25M Nov 2 07:33 site_id.bin-rw-r----- 1 clickhouse clickhouse 58K Nov 2 07:33 primary.idx-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 event.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 metric_value.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 site_id.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 timestamp.mrk2-rw-r----- 1 clickhouse clickhouse 19K Nov 2 07:33 uuid.mrk2-rw-r----- 1 clickhouse clickhouse 494 Nov 2 07:33 checksums.txt-rw-r----- 1 clickhouse clickhouse 123 Nov 2 07:33 columns.txt-rw-r----- 1 clickhouse clickhouse 10 Nov 2 07:33 default_compression_codec.txt-rw-r----- 1 clickhouse clickhouse 7 Nov 2 07:33 count.txt
What are these files?
- For every column, there's a
{column_name}.binfile, containing the compressed (LZ4 compression by default) data for that column. These take up most of the space. - For every column, there's a
{column_name}.mrk2file, contains an index with data to locate each granule in{column_name}.binfile primary.idxcontains information on ORDER BY column values for each granule. This is loaded into memory during queries.checksums.txt,columns.txt,default_compression_codec.txtandcount.txtcontain metadata about this part.
You can read more on the exact structure of these files and how they're used in ClickHouse Index Design documentation.
What does the Merge stand for?
In every system, data must be ingested and kept up-to-date somehow. When data is inserted into MergeTree tables, each insert creates one or multiple parts for the data inserted.
As having a lot of small files would be disadvantageous for many reasons from query performance to storage, ClickHouse regularly merges small parts together until they reach a maximum size.
The merge combines the two parts into a new one. This is similar to how merge sort works and atomically replaces the two source parts.
Merges can be monitored using the system.merges table.
Query execution
Aggregation supported by ORDER BY
Our sensor_values table is set up in a way that queries similar to the following are really fast to execute.
SELECTtoStartOfDay(timestamp),event,sum(metric_value) as total_metric_valueFROM sensor_valuesWHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY toStartOfDay(timestamp), eventORDER BY total_metric_value DESCLIMIT 20
Executing this reports:
20 rows in set. Elapsed: 0.042 sec. Processed 90.11 thousand rows, 3.54 MB (2.13 million rows/s., 83.60 MB/s.)
Why can it be fast? Because ClickHouse:
- leverages the table
ORDER BYclause (ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)) to skip reading a lot of data - is fast and efficient about I/O and aggregation
Let's dig into how the primary index for this query is used by using EXPLAIN.
EXPLAIN indexes=1, header=1 SELECTtoStartOfDay(timestamp),event,sum(metric_value) as total_metric_valueFROM sensor_valuesWHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY toStartOfDay(timestamp), eventORDER BY total_metric_value DESCLIMIT 20FORMAT LineAsString
Show full `EXPLAIN` output
Expression (Projection)Header: toStartOfDay(timestamp) DateTimeevent Stringtotal_metric_value Int64Limit (preliminary LIMIT (without OFFSET))Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Sorting (Sorting for ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64AggregatingHeader: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before GROUP BY)Header: event Stringmetric_value Int32toStartOfDay(timestamp) DateTimeFilter (WHERE)Header: timestamp DateTimeevent Stringmetric_value Int32SettingQuotaAndLimits (Set limits and quota after reading from storage)Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 11/24415
The full output of explain is obtuse, but the most important part is also the most deeply nested one:
ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 11/24415
At the start of the query, ClickHouse loaded the primary index of each part into memory. From this output, we know that the query first used the primary key to filter based on site_id and timestamp values stored in the index.
This allowed it to know that only 11 out of 24415 granules (0.05%) contained any relevant data.
From there it read those 11 granules (11 * 8192 rows) worth of data from timestamp, side_id, event and metric_value columns and did the
rest of filtering and aggregation on that data alone.
See this documentation for a guide on how to choose ORDER BY.
"Point queries" not supported by ORDER BY
Consider this query:
SELECT * FROM sensor_values WHERE uuid = '69028f26-768f-afef-1816-521b22d281ca'
Executing this query reports:
1 row in set. Elapsed: 0.703 sec. Processed 200.00 million rows, 3.20 GB (304.43 million rows/s., 4.87 GB/s.)
While the overall execution time of this query is not bad thanks to fast I/O, it needed to read 2200x the amount of data from disk. As the dataset size or column sizes increase, this performance would get dramatically worse.
Why is this query slower? Because our ORDER BY does not support fast filtering by uuid and ClickHouse needs to read the whole
table to find a single record and read all columns.
ClickHouse provides some ways to make this faster (e.g. Projections) but in general these require extra disk space or have other trade-offs.
Thus, it's important to make sure the ClickHouse schema is aligned with queries that are being executed.
PARTITION BY
Another tool to make queries faster is PARTITION BY. Consider the updated table definition:
CREATE TABLE sensor_values (timestamp DateTime,site_id UInt32,event VARCHAR,uuid UUID,metric_value Int32)ENGINE = MergeTree()PARTITION BY intDiv(toYear(timestamp), 10)ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)SETTINGS index_granularity = 8192
Here, ClickHouse would generate one partition per 10 years of data, allowing to skip reading even the primary index in some cases.
In the underlying data, each part would belong to a single partition and only parts within a partition would get merged.
One additional benefit of partitioning by a derivate of timestamp is that if most queries touch recent data, you can also set up rules to automatically move older parts and partitions to cheaper storage or drop them entirely.
Query analysis
Let's use an identical query as before to explain with the new dataset:
SELECTtoStartOfDay(timestamp),event,sum(metric_value) as total_metric_valueFROM sensor_valuesWHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY toStartOfDay(timestamp), eventORDER BY total_metric_value DESCLIMIT 20
Show full `EXPLAIN` output
Expression (Projection)Header: toStartOfDay(timestamp) DateTimeevent Stringtotal_metric_value Int64Limit (preliminary LIMIT (without OFFSET))Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Sorting (Sorting for ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before ORDER BY)Header: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64AggregatingHeader: toStartOfDay(timestamp) DateTimeevent Stringsum(metric_value) Int64Expression (Before GROUP BY)Header: event Stringmetric_value Int32toStartOfDay(timestamp) DateTimeFilter (WHERE)Header: timestamp DateTimeevent Stringmetric_value Int32SettingQuotaAndLimits (Set limits and quota after reading from storage)Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:MinMaxKeys:timestampCondition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))Parts: 2/14Granules: 3589/24421PartitionKeys:intDiv(toYear(timestamp), 10)Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))Parts: 2/2Granules: 3589/3589PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 12/3589
The relevant part of EXPLAIN is again nested deep within:
ReadFromMergeTreeHeader: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8timestamp DateTimesite_id UInt32event Stringmetric_value Int32Indexes:MinMaxKeys:timestampCondition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))Parts: 2/14Granules: 3589/24421PartitionKeys:intDiv(toYear(timestamp), 10)Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))Parts: 2/2Granules: 3589/3589PrimaryKeyKeys:site_idtoStartOfDay(timestamp)Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))Parts: 2/2Granules: 12/3589
What this tells us is that ClickHouse:
- First leverages an internal MinMax index on
timestampto whittle down the number of parts to 2/14 and granules to 3589/24421 - Then it tries to filter via the partition key but this doesn't narrow things down further
- Then, it loads and leverages the Primary key as before to narrow data down to 12 granules.
- Lastly reads, filters and aggregates data in those 12 granules
The benefit here is that it could skip reading the primary key index for most of the parts that did not contain relevant data. If and how much this speeds up the query however depends on the size of the dataset.
Choosing a good PARTITION BY
Use partitions wisely - each INSERT should ideally only touch 1-2 partitions and too many partitions will cause issues around replication or prove useless for filtering.
Loading the primary index/marks file might not be the bottleneck you expect, so be sure to benchmark different schemas against each other.
See the following Altinity documentation for more guidance:
- How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table
- How much is too much? ClickHouse limitations
Other notes on MergeTree
Data is expensive to update
Updating data in ClickHouse is expensive and analogous to a schema migration.
For example, to update an event's properties, ClickHouse frequently needs to:
- Scan all the data to find what parts contain the relevant data. This isn't often covered by
ORDER BYand thus quite expensive. - Rewrite the whole part (including any columns) - this could be potentially up to 150GB of data rewritten for a single update.
This makes things operationally hard. We mitigate this by:
- Writing duplicated rows for new data, using other table engines (e.g. ReplacingMergeTree) and accounting for this duplication in our queries.
- Batching up GDPR or other data deletions and doing them on a schedule rather than immediately.
No query planner
ClickHouse doesn't have a query planner in the sense PostgreSQL or other databases do.
On the one hand, you often end up fighting the query planner in other databases. If we know how ClickHouse works internally and can develop that into intuition for how SQL is executed, we're well-equipped to deal with performance issues as they arise.
On the other, this means that we'll need to be careful writing SQL as small changes can have huge performance implications.
Examples:
- For best performance, ClickHouse requires you "push" predicates in WHERE clauses into sub-queries rather than filtering at the outermost query.
- In the
sensor_valuesqueries above, the execution plan would have been slightly more optimal if the filter condition ontoYear(timestamp)rather thantimestamp.
One notable exception to "no query planner" is that ClickHouse often pushes predicates from WHERE into PREWHERE. Filters in PREWHERE are executed first and ClickHouse
moves columns it thinks are "cheaper" or "more selective" into it. However putting the wrong column (e.g. a fat column containing JSON) in PREWHERE can cause performance to tank.
Read more on PREWHERE in the ClickHouse docs.
Data compression
Compression means that if subsequent column values of a given column are often similar or identical, the data compresses really well. At PostHog we frequently see uncompressed / compressed ratios of 20x-40x for JSON columns and 300x-2000x for sparse small columns.
Compression ratios have direct impact on query performance: I/O is often the bottleneck, meaning that highly compressed data can be read faster from disk at the cost of more CPU work for decompression.
By default columns are compressed by the LZ4 algorithm. We've found good success using ZSTD(3) for storing JSON columns - see
benchmarks for more information.
Another tip is to use ClickHouse's LowCardinality data type modifier on schemas where a given column will store values with low cardinality i.e. the total number of values is low. An example of this would be "country name".
Weak JOIN support
ClickHouse excels at aggregating data from a single table at a time. If you however have a query with JOINs or subqueries, the right-hand-side of the JOIN would be loaded into memory first. Thus, you should always have the bigger table on the left side of left-hand-side!
This means that at scale JOINs can kill performance. Read more on the effect of removing JOINs from our events database here:
Suggested reading
- ClickHouse MergeTree docs
- Why is ClickHouse so fast?
- Overview of ClickHouse Architecture
- ClickHouse Index Design
- How much is too much? ClickHouse limitations
- How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table
Next in the ClickHouse manual: Data replication