跳到主要内容
跳到主要内容

使用物化视图构建汇总表,实现快速时间序列分析

本教程将演示如何使用 materialized views 从高吞吐量事件表维护预聚合的汇总数据。 您将创建三个对象:一个原始表、一个汇总表,以及一个会自动将数据写入该汇总表的物化视图。

何时使用此模式

在以下情况下使用此模式:

  • 你有一个仅追加写入的事件流(点击、页面浏览、IoT、日志)。
  • 大多数查询是在时间范围上的聚合(按分钟/小时/天)。
  • 你希望获得稳定的一致的亚秒级读取性能,且无需重新扫描所有原始行。

创建原始事件表

CREATE TABLE events_raw
(
    event_time   DateTime,
    user_id      UInt64,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    value        Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
TTL event_time + INTERVAL 90 DAY DELETE

注意事项

  • PARTITION BY toYYYYMM(event_time) 可以使分区保持较小,便于删除。
  • ORDER BY (event_time, user_id) 支持时间范围查询 + 二级过滤条件。
  • LowCardinality(String) 能为类别型维度节省内存。
  • TTL 会在 90 天后清理原始数据(可根据保留需求进行调整)。

设计汇总(聚合)表

我们将预聚合到小时粒度。 请根据最常用的分析时间窗口选择合适的粒度。

CREATE TABLE events_rollup_1h
(
    bucket_start  DateTime,            -- 小时起始时间
    country       LowCardinality(String),  -- 国家
    event_type    LowCardinality(String),  -- 事件类型
    users_uniq    AggregateFunction(uniqExact, UInt64),  -- 唯一用户数
    value_sum     AggregateFunction(sum, Float64),  -- 值求和
    value_avg     AggregateFunction(avg, Float64),  -- 值平均值
    events_count  AggregateFunction(count)  -- 事件数量
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type)

我们存储聚合状态(例如 AggregateFunction(sum, ...))来紧凑地表示部分聚合,这些状态可以在后续合并或完成最终计算。

创建物化视图以填充汇总表

此物化视图在向 events_raw 插入数据时自动触发,并将聚合状态写入汇总表。

CREATE MATERIALIZED VIEW mv_events_rollup_1h
TO events_rollup_1h
AS
SELECT
    toStartOfHour(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id)   AS users_uniq,
    sumState(value)           AS value_sum,
    avgState(value)           AS value_avg,
    countState()              AS events_count
FROM events_raw
GROUP BY bucket_start, country, event_type;

插入示例数据

插入一些示例数据:

INSERT INTO events_raw VALUES
    (now() - INTERVAL 4 SECOND, 101, 'US', 'view', 1),
    (now() - INTERVAL 3 SECOND, 101, 'US', 'click', 1),
    (now() - INTERVAL 2 SECOND, 202, 'DE', 'view', 1),
    (now() - INTERVAL 1 SECOND, 101, 'US', 'view', 1);

查询汇总表

您可以在读取时合并状态,或者完成它们:

SELECT
    bucket_start,
    country,
    event_type,
    uniqExactMerge(users_uniq) AS users,
    sumMerge(value_sum)        AS value_sum,
    avgMerge(value_avg)        AS value_avg,
    countMerge(events_count)   AS events
FROM events_rollup_1h
WHERE bucket_start >= now() - INTERVAL 1 DAY
GROUP BY ALL
ORDER BY bucket_start, country, event_type;

提示

如果您预期读取操作始终命中汇总数据,可以创建第二个物化视图,将_最终确定的_数值写入"普通"的 MergeTree 表,保持相同的 1 小时粒度。 状态提供了更大的灵活性,而最终确定的数值则使读取操作更加简洁。

在主键字段上过滤以获得最佳性能

您可以使用 EXPLAIN 命令查看索引如何用于裁剪数据:

EXPLAIN indexes=1
SELECT *
FROM events_rollup_1h
WHERE bucket_start BETWEEN now() - INTERVAL 3 DAY AND now()
  AND country = 'US';
┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
    1.  │ Expression ((Project names + Projection))                                                                                          │
    2.  │   Expression                                                                                                                       │
    3.  │     ReadFromMergeTree (default.events_rollup_1h)                                                                                   │
    4.  │     Indexes:                                                                                                                       │
    5.  │       MinMax                                                                                                                       │
    6.  │         Keys:                                                                                                                      │
    7.  │           bucket_start                                                                                                             │
    8.  │         Condition: and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))                                 │
    9.  │         Parts: 1/1                                                                                                                 │
    10. │         Granules: 1/1                                                                                                              │
    11. │       Partition                                                                                                                    │
    12. │         Keys:                                                                                                                      │
    13. │           toYYYYMM(bucket_start)                                                                                                   │
    14. │         Condition: and((toYYYYMM(bucket_start) in (-Inf, 202509]), (toYYYYMM(bucket_start) in [202509, +Inf)))                     │
    15. │         Parts: 1/1                                                                                                                 │
    16. │         Granules: 1/1                                                                                                              │
    17. │       PrimaryKey                                                                                                                   │
    18. │         Keys:                                                                                                                      │
    19. │           bucket_start                                                                                                             │
    20. │           country                                                                                                                  │
    21. │         Condition: and((country in ['US', 'US']), and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))) │
    22. │         Parts: 1/1                                                                                                                 │
    23. │         Granules: 1/1                                                                                                              │
        └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

上述查询执行计划显示使用了三种类型的索引: MinMax 索引、分区索引和主键索引。 每个索引都使用了主键中指定的字段:(bucket_start, country, event_type)。 为获得最佳过滤性能,请确保您的查询使用主键字段来裁剪数据。

常见配置变体

  • 不同粒度:添加一个按日汇总:
CREATE TABLE events_rollup_1d
(
    bucket_start Date,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    users_uniq   AggregateFunction(uniqExact, UInt64),
    value_sum    AggregateFunction(sum, Float64),
    value_avg    AggregateFunction(avg, Float64),
    events_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type);

然后创建第二个物化视图:

CREATE MATERIALIZED VIEW mv_events_rollup_1d
TO events_rollup_1d
AS
SELECT
    toDate(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id),
    sumState(value),
    avgState(value),
    countState()
FROM events_raw
GROUP BY ALL;
  • 压缩:在原始表中的大列上应用压缩编解码器(例如:Codec(ZSTD(3)))。
  • 成本控制:将长保留策略应用于原始表,并保留生命周期较长的汇总表。
  • Backfilling(回填):在加载历史数据时,将数据插入到 events_raw 中,让物化视图自动完成汇总构建。对于已有行,如果适用,可以在创建物化视图时使用 POPULATE,或使用 INSERT SELECT

清理与保留策略

  • 延长原始数据的 TTL(例如 30/90 天),但将汇总数据的保留时间设得更长(例如 1 年)。
  • 如果启用了分层存储,你也可以使用 TTL 将 旧的数据分片迁移到更便宜的存储中。

故障排除

  • 物化视图没有更新?检查插入是否写入到 events_raw(而不是汇总表),并确认物化视图的目标表是否正确(TO events_rollup_1h)。
  • 查询变慢?请确认这些查询确实命中了 rollup(直接查询 rollup 表),并确保时间过滤条件与 rollup 表的汇总粒度对齐。
  • 回填数据有不一致?使用 SYSTEM FLUSH LOGS,并检查 system.query_logsystem.parts,以确认插入和合并是否正确完成。