メインコンテンツへスキップ
メインコンテンツへスキップ

マテリアライズドビューを使ってロールアップを構築し、高速な時系列分析を行う

このチュートリアルでは、高ボリュームのイベントテーブルから事前集計されたロールアップを、マテリアライズドビュー を使って維持する方法を説明します。 ここでは、3 つのオブジェクト――生データ用テーブル、ロールアップテーブル、およびロールアップテーブルに自動的に書き込むマテリアライズドビュー――を作成します。

このパターンを使用する状況

次のような場合にこのパターンを使用します:

  • 追記専用のイベントストリーム(クリック、ページビュー、IoT、ログ)を扱っている。
  • ほとんどのクエリが、時間範囲(分/時間/日ごと)に対する集計処理である。
  • すべての生データ行を再スキャンすることなく、常に 1 秒未満の読み取りレイテンシを実現したい。

rawイベントテーブルを作成する

CREATE TABLE events_raw
(
    event_time   DateTime,
    user_id      UInt64,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    value        Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
TTL event_time + INTERVAL 90 DAY DELETE

注記

  • PARTITION BY toYYYYMM(event_time) を指定すると、パーティションを小さく保ち、容易に削除できるようになります。
  • ORDER BY (event_time, user_id) は、時間範囲を指定したクエリおよびセカンダリフィルタをサポートします。
  • LowCardinality(String) は、カテゴリ値を持つディメンションのメモリ消費を抑えます。
  • TTL は 90 日後に生データを自動的に削除します(保持期間の要件に合わせて調整してください)。

ロールアップ(集約)テーブルの設計

時間単位の粒度で事前集計を行います。 最も一般的な分析ウィンドウに合わせて粒度を選択してください。

CREATE TABLE events_rollup_1h
(
    bucket_start  DateTime,            -- 1時間の開始時刻
    country       LowCardinality(String),
    event_type    LowCardinality(String),
    users_uniq    AggregateFunction(uniqExact, UInt64),
    value_sum     AggregateFunction(sum, Float64),
    value_avg     AggregateFunction(avg, Float64),
    events_count  AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type)

集約状態(例:AggregateFunction(sum, ...))を保存します。これにより部分集約をコンパクトに表現し、後でマージまたは最終化することができます。

ロールアップにデータを投入するマテリアライズドビューを作成する

このマテリアライズドビューは events_raw へのデータ挿入時に自動的に実行され、集約状態をロールアップテーブルに書き込みます。

CREATE MATERIALIZED VIEW mv_events_rollup_1h
TO events_rollup_1h
AS
SELECT
    toStartOfHour(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id)   AS users_uniq,
    sumState(value)           AS value_sum,
    avgState(value)           AS value_avg,
    countState()              AS events_count
FROM events_raw
GROUP BY bucket_start, country, event_type;

サンプルデータを挿入する

サンプルデータを挿入する:

INSERT INTO events_raw VALUES
    (now() - INTERVAL 4 SECOND, 101, 'US', 'view', 1),
    (now() - INTERVAL 3 SECOND, 101, 'US', 'click', 1),
    (now() - INTERVAL 2 SECOND, 202, 'DE', 'view', 1),
    (now() - INTERVAL 1 SECOND, 101, 'US', 'view', 1);

ロールアップへのクエリ実行

読み取り時に状態をマージするか、確定するかを選択できます:

SELECT
    bucket_start,
    country,
    event_type,
    uniqExactMerge(users_uniq) AS users,
    sumMerge(value_sum)        AS value_sum,
    avgMerge(value_avg)        AS value_avg,
    countMerge(events_count)   AS events
FROM events_rollup_1h
WHERE bucket_start >= now() - INTERVAL 1 DAY
GROUP BY ALL
ORDER BY bucket_start, country, event_type;

ヒント

読み取りが常にロールアップにヒットすることが想定される場合は、2つ目のマテリアライズドビューを作成して、_確定済み_の数値を同じ1時間粒度の「プレーン」なMergeTreeテーブルに書き込むことができます。 状態を使用すると柔軟性が高まりますが、確定済みの数値を使用すると読み取りがやや簡潔になります。

最適なパフォーマンスのためにプライマリキーのフィールドでフィルタリングする

EXPLAINコマンドを使用して、インデックスがデータの刈り込みにどのように使用されているかを確認できます:

EXPLAIN indexes=1
SELECT *
FROM events_rollup_1h
WHERE bucket_start BETWEEN now() - INTERVAL 3 DAY AND now()
  AND country = 'US';
┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
    1.  │ Expression ((Project names + Projection))                                                                                          │
    2.  │   Expression                                                                                                                       │
    3.  │     ReadFromMergeTree (default.events_rollup_1h)                                                                                   │
    4.  │     Indexes:                                                                                                                       │
    5.  │       MinMax                                                                                                                       │
    6.  │         Keys:                                                                                                                      │
    7.  │           bucket_start                                                                                                             │
    8.  │         Condition: and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))                                 │
    9.  │         Parts: 1/1                                                                                                                 │
    10. │         Granules: 1/1                                                                                                              │
    11. │       Partition                                                                                                                    │
    12. │         Keys:                                                                                                                      │
    13. │           toYYYYMM(bucket_start)                                                                                                   │
    14. │         Condition: and((toYYYYMM(bucket_start) in (-Inf, 202509]), (toYYYYMM(bucket_start) in [202509, +Inf)))                     │
    15. │         Parts: 1/1                                                                                                                 │
    16. │         Granules: 1/1                                                                                                              │
    17. │       PrimaryKey                                                                                                                   │
    18. │         Keys:                                                                                                                      │
    19. │           bucket_start                                                                                                             │
    20. │           country                                                                                                                  │
    21. │         Condition: and((country in ['US', 'US']), and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))) │
    22. │         Parts: 1/1                                                                                                                 │
    23. │         Granules: 1/1                                                                                                              │
        └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

上記のクエリ実行計画では、3種類のインデックスが使用されています: MinMaxインデックス、パーティションインデックス、プライマリキーインデックスです。 各インデックスは、プライマリキーで指定されたフィールド (bucket_start, country, event_type) を使用します。 最適なフィルタリングパフォーマンスを実現するには、クエリがプライマリキーフィールドを使用してデータをプルーニングしていることを確認してください。

一般的な構成パターン

  • 異なる粒度: 日次ロールアップを追加:
CREATE TABLE events_rollup_1d
(
    bucket_start Date,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    users_uniq   AggregateFunction(uniqExact, UInt64),
    value_sum    AggregateFunction(sum, Float64),
    value_avg    AggregateFunction(avg, Float64),
    events_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type);

次に2つ目のマテリアライズドビューを作成します:

CREATE MATERIALIZED VIEW mv_events_rollup_1d
TO events_rollup_1d
AS
SELECT
    toDate(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id),
    sumState(value),
    avgState(value),
    countState()
FROM events_raw
GROUP BY ALL;
  • 圧縮: raw テーブルのサイズの大きなカラムにコーデックを適用します(例: Codec(ZSTD(3)))。
  • コスト制御:長期保持は生データテーブル側に寄せ、ロールアップは長期間維持する。
  • バックフィル:過去データを読み込む場合は、events_raw に挿入し、マテリアライズドビューにロールアップを自動的に作成させます。既存の行については、適切であればマテリアライズドビュー作成時に POPULATE を使用するか、INSERT SELECT を使用します。

クリーンアップとデータ保持

  • 生データのTTLは(例: 30/90日)に延長しつつ、ロールアップは(例: 1年)と、さらに長期間保持します。
  • ティアリングが有効な場合は、古いパーツをより安価なストレージに移動するために TTL to move を使用することもできます。

トラブルシューティング

  • マテリアライズドビューが更新されない場合は、データの挿入先がロールアップテーブルではなく events_raw になっていること、またマテリアライズドビューの出力先が正しいこと(TO events_rollup_1h)を確認してください。
  • クエリが遅い場合は、ロールアップにヒットしているか(ロールアップテーブルを直接クエリする)、また時間フィルターがロールアップの粒度に合っているかを確認します。
  • バックフィルで不整合が発生していますか?SYSTEM FLUSH LOGS を実行し、system.query_logsystem.parts を確認して、INSERT とマージが正しく行われていることを確認してください。