メインコンテンツへスキップ
メインコンテンツへスキップ

Amazon MSK と ClickHouse の統合

注記: 動画内で示されているポリシーは権限が広く、クイックスタート用途のみを想定しています。以下の最小権限の IAM ガイダンスを参照してください。

前提条件

次のことを前提とします:

ClickHouse 公式 Kafka コネクタと Amazon MSK の連携

接続情報を確認する

HTTP(S) で ClickHouse に接続するには、次の情報が必要です。

Parameter(s)Description
HOST and PORT通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合のポートは 8123 です。
DATABASE NAME既定で default という名前のデータベースが用意されています。接続したいデータベースの名前を使用してください。
USERNAME and PASSWORD既定のユーザー名は default です。用途に応じて適切なユーザー名を使用してください。

ClickHouse Cloud サービスに関する詳細情報は、ClickHouse Cloud コンソールで確認できます。 サービスを選択し、Connect をクリックします。

ClickHouse Cloud サービスの Connect ボタン

HTTPS を選択します。接続情報は、サンプルの curl コマンド内に表示されます。

ClickHouse Cloud HTTPS 接続詳細

自己管理型の ClickHouse を使用している場合、接続情報は ClickHouse 管理者によって設定されます。

手順

  1. ClickHouse Connector Sink に目を通しておく。
  2. MSK インスタンスを作成する
  3. IAM ロールを作成して割り当てる
  4. ClickHouse Connect Sink の リリースページ から jar ファイルをダウンロードする。
  5. ダウンロードした jar ファイルを、Amazon MSK コンソールの カスタムプラグインページ にインストールする。
  6. コネクタがパブリックな ClickHouse インスタンスと通信する場合は、インターネットアクセスを有効化する
  7. 設定に、トピック名、ClickHouse インスタンスのホスト名、およびパスワードを指定する。
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false

推奨 IAM 権限(最小権限)

環境に必要な最小限の権限だけを付与してください。まずは以下のベースラインから始め、利用するサービスがある場合にのみオプションの権限を追加します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:GetBootstrapBrokers",
        "kafka:DescribeClusterV2",
        "kafka:ListClusters",
        "kafka:ListClustersV2"
      ],
      "Resource": "*"
    },
    {
      "Sid": "KafkaAuthorization",
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:DescribeCluster",
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:ReadData"
      ],
      "Resource": "*"
    },
    {
      "Sid": "OptionalGlueSchemaRegistry",
      "Effect": "Allow",
      "Action": [
        "glue:GetSchema*",
        "glue:ListSchemas",
        "glue:ListSchemaVersions"
      ],
      "Resource": "*"
    },
    {
      "Sid": "OptionalSecretsManager",
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": [
        "arn:aws:secretsmanager:<region>:<account-id>:secret:<your-secret-name>*"
      ]
    },
    {
      "Sid": "OptionalS3Read",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::<your-bucket>/<optional-prefix>/*"
    }
  ]
}
  • AWS Glue Schema Registry を使用している場合にのみ、Glue ブロックを使用してください。
  • Secrets Manager から認証情報や truststore を取得する場合にのみ、Secrets Manager ブロックを使用してください。ARN のスコープを適切に絞り込んでください。
  • S3 からアーティファクト(例: truststore)を読み込む場合にのみ、S3 ブロックを使用してください。バケット/プレフィックス単位でスコープを絞り込んでください。

あわせて参照してください: Kafka のベストプラクティス – IAM.

パフォーマンスチューニング

パフォーマンスを向上させる 1 つの方法は、worker の設定に次の項目を追加し、Kafka から取得するバッチサイズとレコード数を調整することです。

consumer.max.poll.records=[レコード数]
consumer.max.partition.fetch.bytes=[レコード数 × バイト単位のレコードサイズ]

使用する具体的な値は、必要とするレコード数やレコードサイズによって異なります。たとえば、デフォルト値は次のとおりです。

consumer.max.poll.records=500
consumer.max.partition.fetch.bytes=1048576

実装に関する詳細やその他の検討事項については、公式の Kafka ドキュメントおよび Amazon MSK ドキュメントを参照してください。

MSK Connect のネットワーキングに関する注意事項

MSK Connect から ClickHouse に接続できるようにするには、MSK クラスターをプライベートサブネット内に配置し、インターネットアクセス用にプライベート NAT ゲートウェイを接続することを推奨します。設定手順は以下のとおりです。パブリックサブネットもサポートされていますが、ENI に Elastic IP アドレスを継続的に割り当てる必要があるため推奨されません。詳細は AWS のドキュメントを参照してください

  1. プライベートサブネットを作成する: VPC 内に新しいサブネットを作成し、それをプライベートサブネットとして指定します。このサブネットはインターネットへ直接アクセスできないようにします。
  2. NAT ゲートウェイを作成する: VPC のパブリックサブネット内に NAT ゲートウェイを作成します。NAT ゲートウェイにより、プライベートサブネット内のインスタンスがインターネットや他の AWS サービスへ接続できる一方で、インターネット側からそれらのインスタンスへの接続開始は防止されます。
  3. ルートテーブルを更新する: インターネット向けトラフィックを NAT ゲートウェイに転送するルートを追加します。
  4. セキュリティグループおよびネットワーク ACL の設定を確認する: 関連するトラフィックを許可するように セキュリティグループ および ネットワーク ACL (Access Control Lists) を設定します。
    1. MSK Connect ワーカー ENI から、TLS ポート(一般的には 9094)の MSK ブローカーへのトラフィック。
    2. MSK Connect ワーカー ENI から ClickHouse エンドポイントへのトラフィック: 9440(ネイティブ TLS)または 8443(HTTPS)。
    3. ブローカーのセキュリティグループで、MSK Connect ワーカーのセキュリティグループからのインバウンドを許可します。
    4. セルフホストの ClickHouse の場合は、サーバーで設定しているポート(デフォルトでは HTTP 用に 8123)を開放します。
  5. セキュリティグループを MSK にアタッチする: これらのセキュリティグループが MSK クラスターおよび MSK Connect ワーカーにアタッチされていることを確認します。
  6. ClickHouse Cloud への接続:
    1. パブリックエンドポイント + IP 許可リスト方式: プライベートサブネットから NAT 経由での送信(アウトバウンド)トラフィックが必要です。
    2. 利用可能な場合のプライベート接続(例: VPC ピアリング / PrivateLink / VPN)。VPC の DNS ホスト名/名前解決が有効化されており、DNS がプライベートエンドポイントを解決できることを確認します。
  7. 接続検証(簡易チェックリスト):
    1. コネクターの実行環境から MSK のブートストラップ DNS を解決し、ブローカーポートへ TLS で接続できること。
    2. ClickHouse の 9440 ポート(または HTTPS 用の 8443)へ TLS 接続を確立できること。
    3. AWS のサービス(Glue / Secrets Manager)を使用する場合、それらのエンドポイントへの送信(アウトバウンド)トラフィックが許可されていること。