メインコンテンツへスキップ
メインコンテンツへスキップ

DynamoDB から ClickHouse への CDC

Experimental feature. Learn more.

このページでは、ClickPipes を使用して DynamoDB から ClickHouse へ CDC(変更データキャプチャ)を設定する方法を説明します。この連携は次の 2 つのコンポーネントから構成されます。

  1. S3 ClickPipes による初回スナップショット
  2. Kinesis ClickPipes によるリアルタイム更新

データは ReplacingMergeTree に取り込まれます。このテーブルエンジンは、更新操作を反映できるようにするため、CDC シナリオで一般的に使用されます。このパターンの詳細は、以下のブログ記事で確認できます。

1. Kinesis ストリームをセットアップする

まず、DynamoDB テーブルに対して Kinesis ストリームを有効化し、変更をリアルタイムで取得できるようにします。スナップショットを作成する前にこれを実施して、データの取りこぼしを防ぎます。 AWS のガイドを参照してください。

DynamoDB Kinesis ストリーム

2. スナップショットを作成する

次に、DynamoDB テーブルのスナップショットを取得します。これは、AWS の S3 へのエクスポート機能を使用して実行します。AWS のガイドはこちらにあります。 DynamoDB JSON 形式で「Full export」を実行してください。

DynamoDB S3 Export

3. スナップショットを ClickHouse に読み込む

必要なテーブルを作成する

DynamoDB からのスナップショットデータはおおよそ次のようになります:

{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}

データがネストされた形式になっていることがわかります。ClickHouse にロードする前に、このデータをフラット化する必要があります。これは、ClickHouse の JSONExtract 関数を用いたマテリアライズドビューで実行できます。

ここでは、次の 3 つのテーブルを作成します。

  1. DynamoDB からの生データを保存するテーブル
  2. 最終的なフラット化済みデータを保存するテーブル(宛先テーブル)
  3. データをフラット化するためのマテリアライズドビュー

上記の DynamoDB の例データに対して、ClickHouse のテーブルは次のようになります。

/* スナップショットテーブル */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* フラット化済み最終データ用テーブル */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* フラット化済み最終データ用テーブル */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

宛先テーブルには、いくつかの要件があります。

  • このテーブルは ReplacingMergeTree テーブルである必要があります。
  • テーブルには version カラムが必要です。
    • 後続のステップで、Kinesis ストリームの ApproximateCreationDateTime フィールドを version カラムにマッピングします。
  • テーブルはパーティションキーをソートキー(ORDER BY で指定)として使用する必要があります。
    • 同じソートキーを持つ行は、version カラムに基づいて重複排除されます。

スナップショット用 ClickPipe の作成

ここまでで、S3 から ClickHouse へスナップショットデータをロードするための ClickPipe を作成できます。S3 ClickPipe ガイドはこちらに従いますが、次の設定を使用してください。

  • Ingest path: S3 内のエクスポートされた JSON ファイルのパスを特定する必要があります。パスは次のような形式になります。
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • Format: JSONEachRow
  • Table: スナップショット用テーブル(例: 上記の例では default.snapshot

作成が完了すると、スナップショットテーブルと宛先テーブルへのデータ取り込みが開始されます。次の手順に進む前に、スナップショットの読み込み処理が完了するのを待つ必要はありません。

4. Kinesis ClickPipe を作成する

ここでは、Kinesis ストリームからのリアルタイムな変更を取り込むための Kinesis ClickPipe をセットアップします。Kinesis ClickPipe ガイドはこちらを参照し、次の設定を使用してください:

  • Stream: ステップ 1 で使用した Kinesis ストリーム
  • Table: 宛先テーブル(例: 上記の例では default.destination
  • Flatten object: true
  • Column mappings:
    • ApproximateCreationDateTime: version
    • 他のフィールドは、以下に示すように適切な宛先カラムにマッピングする
DynamoDB カラムマッピング

5. クリーンアップ(オプション)

スナップショット ClickPipe の処理が完了したら、スナップショットテーブルとマテリアライズドビューを削除できます。

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";