跳到主要内容
跳到主要内容

集成 Apache Beam 与 ClickHouse

ClickHouse Supported

Apache Beam 是一个开源的统一编程模型,使开发者能够定义和执行批处理和流式(连续)数据处理管道。Apache Beam 的灵活性体现在它能够支持广泛的数据处理场景,从 ETL(抽取、转换、加载)操作到复杂事件处理和实时分析。 本集成在数据写入层使用了 ClickHouse 官方的 JDBC 连接器

集成包

用于集成 Apache Beam 和 ClickHouse 的集成包由 Apache Beam I/O Connectors 维护和开发——这是一个汇集众多主流数据存储系统和数据库的集成组件集合。 org.apache.beam.sdk.io.clickhouse.ClickHouseIO 的实现位于 Apache Beam 仓库 中。

设置 Apache Beam ClickHouse 包

安装包

将以下依赖添加到你的包管理工具中:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-clickhouse</artifactId>
    <version>${beam.version}</version>
</dependency>
推荐的 Beam 版本

ClickHouseIO 连接器推荐从 Apache Beam 版本 2.59.0 起使用。 较早的版本可能无法完全支持该连接器的功能。

相关构件可以在官方 Maven 仓库中找到。

代码示例

以下示例将名为 input.csv 的 CSV 文件读取为 PCollection,将其转换为 Row 对象(基于已定义的 schema),并使用 ClickHouseIO 将其插入到本地 ClickHouse 实例中:


package org.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;

public class Main {

    public static void main(String[] args) {
        // 创建 Pipeline 对象。
        Pipeline p = Pipeline.create();

        Schema SCHEMA =
                Schema.builder()
                        .addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
                        .addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
                        .addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
                        .build();

        // 将转换应用到管道。
        PCollection<String> lines = p.apply("读取行", TextIO.read().from("src/main/resources/input.csv"));

        PCollection<Row> rows = lines.apply("转换为行", ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String line, OutputReceiver<Row> out) {

                String[] values = line.split(",");
                Row row = Row.withSchema(SCHEMA)
                        .addValues(values[0], Short.parseShort(values[1]), DateTime.now())
                        .build();
                out.output(row);
            }
        })).setRowSchema(SCHEMA);

        rows.apply("写入 ClickHouse",
                        ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));

        // 运行管道。
        p.run().waitUntilFinish();
    }
}

支持的数据类型

ClickHouseApache Beam是否支持说明
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes 是一种 LogicalType,表示固定长度的
字节数组,定义在
org.apache.beam.sdk.schemas.logicaltypes 包中
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

ClickHouseIO.Write 参数

可以使用以下 setter 函数来调整 ClickHouseIO.Write 的配置:

参数设置函数参数类型默认值描述
withMaxInsertBlockSize(long maxInsertBlockSize)1000000每次插入的数据块的最大行数。
withMaxRetries(int maxRetries)5插入失败时的最大重试次数。
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)重试时允许的最大累计退避时长。
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)第一次重试前的初始退避时长。
withInsertDistributedSync(Boolean sync)true若为 true,则对分布式表的插入操作以同步方式执行。
withInsertQuorum(Long quorum)null确认一次插入操作所需的副本数量。
withInsertDeduplicate(Boolean deduplicate)true若为 true,则对插入操作启用去重。
withTableSchema(TableSchema schema)null目标 ClickHouse 表的表结构(schema)。

限制

使用该连接器时请注意以下限制:

  • 截至目前,仅支持 Sink 操作。该连接器不支持 Source 操作。
  • 在向 ReplicatedMergeTree 或基于 ReplicatedMergeTree 构建的 Distributed 表中插入数据时,ClickHouse 会执行去重操作。如果未启用复制,向普通 MergeTree 表插入数据时,当一次插入失败并随后重试成功时,可能会产生重复数据。不过,每个数据块的插入是原子性的,并且可以使用 ClickHouseIO.Write.withMaxInsertBlockSize(long) 配置块大小。去重是通过插入数据块的校验和来实现的。有关去重的更多信息,请访问 去重插入去重配置
  • 该连接器不会执行任何 DDL 语句;因此,在执行插入之前,目标表必须已经存在。