Apache NiFiをClickHouseに接続する
Apache NiFi
は、ソフトウェアシステム間のデータフローを自動化するために設計されたオープンソースのワークフロー管理ソフトウェアです。ETLデータパイプラインの作成が可能で、300以上のデータプロセッサが同梱されています。このステップバイステップのチュートリアルでは、Apache NiFiをソースと宛先の両方としてClickHouseに接続し、サンプルデータセットをロードする方法を説明します。
接続情報を収集する
HTTP(S) で ClickHouse に接続するには、次の情報が必要です。
| Parameter(s) | Description |
|---|---|
HOST and PORT | 通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合のポートは 8123 です。 |
DATABASE NAME | 既定で default という名前のデータベースが用意されています。接続したいデータベースの名前を使用してください。 |
USERNAME and PASSWORD | 既定のユーザー名は default です。用途に応じて適切なユーザー名を使用してください。 |
ClickHouse Cloud サービスに関する詳細情報は、ClickHouse Cloud コンソールで確認できます。 サービスを選択し、Connect をクリックします。

HTTPS を選択します。接続情報は、サンプルの curl コマンド内に表示されます。

自己管理型の ClickHouse を使用している場合、接続情報は ClickHouse 管理者によって設定されます。
Apache NiFi のダウンロードと実行
新規セットアップを行う場合は、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh start を実行して起動します。
ClickHouse JDBC ドライバーをダウンロードする
- GitHub の ClickHouse JDBC ドライバーのリリースページ にアクセスし、最新の JDBC リリースバージョンを確認します
- 対象のリリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」というキーワードを含む JAR ファイルを探します(例:
clickhouse-jdbc-0.5.0-all.jar) - JAR ファイルを Apache NiFi からアクセス可能なフォルダに配置し、その絶対パスを控えておきます
DBCPConnectionPool コントローラサービスを追加し、プロパティを設定する
-
Apache NiFi でコントローラサービスを設定するには、歯車アイコン("gear" ボタン)をクリックして NiFi Flow Configuration ページを開きます

-
Controller Services タブを選択し、右上の
+ボタンをクリックして新しいコントローラサービスを追加します
-
DBCPConnectionPoolを検索し、「Add」ボタンをクリックします
-
追加したばかりの
DBCPConnectionPoolは、デフォルトでは無効 (Invalid) 状態になっています。歯車アイコン("gear" ボタン)をクリックして設定を開始します
-
「Properties」セクションで、次の値を入力します
| Property | Value | Remark |
|---|---|---|
| Database Connection URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 接続 URL 中の HOSTNAME を環境に合わせて置き換えます |
| Database Driver Class Name | com.clickhouse.jdbc.ClickHouseDriver | |
| Database Driver Location(s) | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBC ドライバ JAR ファイルへの絶対パス |
| Database User | default | ClickHouse ユーザー名 |
| Password | password | ClickHouse パスワード |
-
Settings セクションで、コントローラサービスの名前を分かりやすくするために「ClickHouse JDBC」に変更します

-
「lightning」ボタンをクリックし、続いて「Enable」ボタンをクリックして
DBCPConnectionPoolコントローラサービスを有効化します

-
Controller Services タブを開き、コントローラサービスが有効化されていることを確認します

ExecuteSQL プロセッサを使用してテーブルから読み取る
-
適切なアップストリームおよびダウンストリームのプロセッサと共に、
ExecuteSQLプロセッサを追加します
-
ExecuteSQLプロセッサの「Properties」セクションで、次の値を入力しますProperty Value Remark Database Connection Pooling Service ClickHouse JDBC ClickHouse 用に設定した Controller Service を選択します SQL select query SELECT * FROM system.metrics ここにクエリを入力します -
ExecuteSQLプロセッサを開始します
-
クエリが正常に処理されたことを確認するため、出力キュー内の
FlowFileの 1 つを開いて確認します
-
表示を「formatted」に切り替えて、出力された
FlowFileの結果を表示します
MergeRecordとPutDatabaseRecordプロセッサを使用してテーブルに書き込む
-
単一のINSERT文で複数行を書き込むには、まず複数のレコードを1つのレコードにマージする必要があります。これは
MergeRecordプロセッサを使用して実行できます -
MergeRecordプロセッサの「Properties」セクションで、以下の値を入力してくださいProperty Value Remark Record Reader JSONTreeReader適切なレコードリーダーを選択してください Record Writer JSONReadSetWriter適切なレコードライターを選択してください Minimum Number of Records 1000 最小行数をマージして1つのレコードを形成するように、この値をより大きな数値に変更してください。デフォルトは1行です Maximum Number of Records 10000 「Minimum Number of Records」よりも大きな数値に変更してください。デフォルトは1,000行です -
複数のレコードが1つにマージされていることを確認するには、
MergeRecordプロセッサの入力と出力を確認してください。出力は複数の入力レコードの配列であることに注意してください入力

出力

-
PutDatabaseRecordプロセッサの「Properties」セクションで、以下の値を入力してくださいProperty Value Remark Record Reader JSONTreeReader適切なレコードリーダーを選択してください Database Type Generic デフォルトのままにしてください Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC ClickHouseコントローラーサービスを選択してください Table Name tbl ここにテーブル名を入力してください Translate Field Names false 挿入されるフィールド名がカラム名と一致する必要があるように「false」に設定してください Maximum Batch Size 1000 INSERT文あたりの最大行数。この値は MergeRecordプロセッサの「Minimum Number of Records」の値より小さくしないでください -
各INSERT文に複数行が含まれていることを確認するには、テーブルの行数が
MergeRecordで定義された「Minimum Number of Records」の値以上ずつ増加していることを確認してください。
-
おめでとうございます - Apache NiFiを使用してClickHouseへのデータ読み込みに成功しました!