This document covers:
- Different options for ingesting data into MergeTree tables and trade-offs involved
- How the Kafka table engine works
- What are materialized views?
- Examples of a full schema setup
Using INSERTs for ingestion
As any database system, ClickHouse allows using INSERTs to load data.
Each INSERT creates a new part in ClickHouse, which
comes with a lot of overhead and, in a busy system, will lead to errors due to exceeding
parts_to_throw MergeTree table setting (default 300).
ClickHouse provides a bunch of options to make INSERTs still work. For example:
- Batch inserts
async_insertsetting- Buffer table engine
These come with their own trade-offs, consistency problems, and require the ClickHouse cluster to always be accessible.
Why we ingest via Kafka tables
We instead rely on the Kafka table engine to handle ingestion into ClickHouse.
The benefits are:
- Resiliency: Kafka handles sudden spikes in traffic and ClickHouse cluster unavailability gracefully
- PostHog already uses Kafka throughout the app, making it a safe technical choice
It also has minimal overhead in terms of memory used and allows us to always temporarily stop ingestion by removing the tables in question.
How Kafka tables work
Kafka engine tables act as Kafka consumers in a given consumer group. Selecting from that table advances the consumer offsets.
A Kafka table on its own does nothing beyond allowing querying data from Kafka - it needs to be paired with other tables for ingestion to work.
Important note: Given Kafka engine tables operate like consumers, querying data from them moves the offsets for the consumer group forward. Doing this while ingesting data may cause data loss, and has been disallowed by default on the latest ClickHouse versions.
Example kafka engine table:
CREATE TABLE kafka_ingestion_warnings(team_id Int64,source LowCardinality(VARCHAR),type VARCHAR,details VARCHAR CODEC(ZSTD(3)),timestamp DateTime64(6, 'UTC'))ENGINE = Kafka('kafka:9092', 'clickhouse_ingestion_warnings_test', 'group1', 'JSONEachRow')
It is important to send correctly formatted messages to the topic you're selecting from. When selecting from a Kafka table,
ClickHouse assumes messages in the topic are formatted correctly. If not, this may stall the consumer depending on the value
of kafka_skip_broken_messages, breaking ingestion.
Beyond just skipping broken messages, it's also possible to set up a dead letter queue system for these in ClickHouse. You can read more about doing so in this Altinity blog post.
Materialized views
Materialized views in ClickHouse can be thought of as triggers - they react to new blocks being INSERTed into source tables and allow transforming and piping that data to other tables.
Materialized views come with a lot of gotchas. A great resource for learning more about them is this presentation.
Example schema - reading and writing ingestion events
Consider the following sharded table schema together with kafka_ingestion_warnings:
CREATE TABLE sharded_ingestion_warnings(team_id Int64,source LowCardinality(VARCHAR),type VARCHAR,details VARCHAR CODEC(ZSTD(3)),timestamp DateTime64(6, 'UTC'),_timestamp DateTime,_offset UInt64,_partition UInt64)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/posthog.sharded_ingestion_warnings', '{replica}')PARTITION BY toYYYYMMDD(timestamp)ORDER BY (team_id, toHour(timestamp), type, source, timestamp)CREATE TABLE ingestion_warnings ON CLUSTER 'posthog'(team_id Int64,source LowCardinality(VARCHAR),type VARCHAR,details VARCHAR CODEC(ZSTD(3)),timestamp DateTime64(6, 'UTC'),_timestamp DateTime,_offset UInt64,_partition UInt64)ENGINE = Distributed('posthog', 'posthog', 'sharded_ingestion_warnings', rand())CREATE MATERIALIZED VIEW ingestion_warnings_mvTO posthog.ingestion_warningsAS SELECTteam_id,source,type,details,timestamp,_timestamp,_offset,_partitionFROM posthog.kafka_ingestion_warnings
In this schema:
sharded_ingestion_warningsMergeTree is responsible for storing the ingested dataingestion_warningstable is responsible for fielding queries and distributing writes tosharded_ingestion_warningstables across shardsingestion_warnings_mvregularly pollskafka_ingestion_warningsand pushes the data toingestion_warningsdistributed table- Note: it also forwards
_timestamp,_offset, and_partitionvirtual columns containing Kafka message metadata so they can be stored and used during debugging.
- Note: it also forwards
Example schema visualized
This is the same schema visualized in a ClickHouse cluster with 2 shards and 1 replica each:
Further reading
- Performance Considerations ClickHouse docs
- Using the Kafka table engine
- Kafka ClickHouse docs
- Confluent concepts (Zookeeper)
- ClickHouse Materialized Views Illuminated, Part 1
- ClickHouse's not so secret weapon... Materialized Views
- Everything you should know about materialized views.
Next in the ClickHouse manual: Working with JSON