将 Apache NiFi 连接到 ClickHouse
Apache NiFi
是一款开源工作流管理软件,旨在实现软件系统间的数据流自动化。它支持创建 ETL 数据管道,并内置超过 300 个数据处理器。本分步教程将演示如何将 Apache NiFi 同时作为数据源和目标连接到 ClickHouse,并加载示例数据集。
准备连接信息
要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:
| 参数 | 说明 |
|---|---|
HOST 和 PORT | 通常,在使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。 |
DATABASE NAME | 默认提供一个名为 default 的数据库,请填写您要连接的目标数据库名称。 |
USERNAME 和 PASSWORD | 默认用户名为 default。请使用适合您使用场景的用户名。 |
您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中查看。 选择某个服务并点击 Connect:

选择 HTTPS。连接信息会显示在示例 curl 命令中。

如果您使用的是自托管 ClickHouse,则连接信息由您的 ClickHouse 管理员进行设置。
下载并运行 Apache NiFi
对于全新部署,请从 https://nifi.apache.org/download.html 下载二进制文件,并通过运行 ./bin/nifi.sh start 来启动
下载 ClickHouse JDBC 驱动程序
- 访问 GitHub 上的 ClickHouse JDBC 驱动程序发布页面,并找到最新的 JDBC 发行版本
- 在该版本的发布内容中,点击 “Show all xx assets”,然后查找文件名中包含关键字 “shaded” 或 “all” 的 JAR 文件,例如
clickhouse-jdbc-0.5.0-all.jar - 将该 JAR 文件放置在 Apache NiFi 可访问的文件夹中,并记录其绝对路径
添加 DBCPConnectionPool Controller Service 并配置其属性
-
要在 Apache NiFi 中配置 Controller Service,点击“齿轮”按钮打开 NiFi Flow Configuration 页面

-
选择 Controller Services 选项卡,并点击右上角的
+按钮添加一个新的 Controller Service
-
搜索
DBCPConnectionPool并点击 “Add” 按钮
-
新添加的
DBCPConnectionPool默认处于 Invalid 状态。点击“齿轮”按钮开始配置
-
在 Properties 部分中,输入以下值
| Property | Value | Remark |
|---|---|---|
| Database Connection URL | jdbc:ch:https://HOSTNAME:8443/default?ssl=true | 根据实际情况替换连接 URL 中的 HOSTNAME |
| Database Driver Class Name | com.clickhouse.jdbc.ClickHouseDriver | |
| Database Driver Location(s) | /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar | ClickHouse JDBC 驱动 JAR 文件的绝对路径 |
| Database User | default | ClickHouse 用户名 |
| Password | password | ClickHouse 密码 |
-
在 Settings 部分,将该 Controller Service 的名称修改为 “ClickHouse JDBC”,以便于识别

-
点击“闪电”按钮,然后点击 “Enable” 按钮,激活
DBCPConnectionPoolController Service

-
检查 Controller Services 选项卡,确认该 Controller Service 已启用

使用 ExecuteSQL 处理器从表中读取数据
-
添加一个
ExecuteSQL处理器,并配置相应的上游和下游处理器
-
在
ExecuteSQL处理器的 "Properties" 部分中,输入以下值Property Value Remark Database Connection Pooling Service ClickHouse JDBC 选择为 ClickHouse 配置的 Controller Service SQL select query SELECT * FROM system.metrics 在此输入你的查询 -
启动
ExecuteSQL处理器
-
为确认查询已成功处理,检查输出队列中的任意一个
FlowFile
-
将视图切换为 "formatted" 以查看输出
FlowFile的结果
使用 MergeRecord 和 PutDatabaseRecord 处理器写入表
-
要在单次插入中写入多行数据,首先需要将多条记录合并为单条记录。可以使用
MergeRecord处理器来完成此操作 -
在
MergeRecord处理器的"Properties"部分,输入以下值Property Value Remark Record Reader JSONTreeReader选择适当的记录读取器 Record Writer JSONReadSetWriter选择适当的记录写入器 Minimum Number of Records 1000 将此值更改为更大的数字,以便合并最小数量的行来形成单条记录。默认为 1 行 Maximum Number of Records 10000 将此值更改为大于"Minimum Number of Records"的数字。默认为 1,000 行 -
要确认多条记录已合并为一条,请检查
MergeRecord处理器的输入和输出。注意输出是多条输入记录组成的数组输入

输出

-
在
PutDatabaseRecord处理器的"Properties"部分,输入以下值Property Value Remark Record Reader JSONTreeReader选择适当的记录读取器 Database Type Generic 保留默认值 Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC 选择 ClickHouse 控制器服务 Table Name tbl 在此处输入您的表名 Translate Field Names false 设置为"false",以便插入的字段名称必须与列名匹配 Maximum Batch Size 1000 每次插入的最大行数。此值不应低于 MergeRecord处理器中"Minimum Number of Records"的值 -
要确认每次插入包含多行数据,请检查表中的行数是否至少按
MergeRecord中定义的"Minimum Number of Records"值递增
-
恭喜 - 您已成功使用 Apache NiFi 将数据加载到 ClickHouse 中!