跳到主要内容
跳到主要内容

AvroConfluent

输入输出别名

描述

Apache Avro 是一种面向行的序列化格式,使用二进制编码以实现高效的数据处理。AvroConfluent 格式支持解码使用 Confluent Schema Registry(或 API 兼容服务)序列化的、单对象且使用 Avro 编码的 Kafka 消息。

每条 Avro 消息都会嵌入一个模式 ID(schema ID),ClickHouse 会通过查询已配置的 Schema Registry 自动解析该 ID。模式解析完成后会被缓存,以获得最佳性能。

数据类型映射

下表展示了 Apache Avro 格式支持的所有数据类型,以及它们在 INSERTSELECT 查询中对应的 ClickHouse 数据类型

Avro 数据类型 INSERTClickHouse 数据类型Avro 数据类型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytesstring *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* 默认值为 bytes,此行为由设置 output_format_avro_string_column_pattern 控制

** Variant 类型 会隐式接受 null 作为字段值,因此,例如 Avro 的 union(T1, T2, null) 会被转换为 Variant(T1, T2)。 因此,当从 ClickHouse 生成 Avro 时,我们必须始终在 Avro 的 union 类型集合中包含 null 类型,因为在模式推断期间我们无法得知是否有任何值实际为 null

*** Avro 逻辑类型

不支持的 Avro 逻辑数据类型:

  • time-millis
  • time-micros
  • duration

格式设置

SettingDescriptionDefault
input_format_avro_allow_missing_fields指定在模式中找不到字段时,是否使用默认值而不是抛出错误。0
input_format_avro_null_as_default指定在向非空列插入 null 值时,是否使用默认值而不是抛出错误。0
format_avro_schema_registry_urlConfluent Schema Registry 的 URL。对于基本身份验证,可以在 URL 中直接包含经过 URL 编码的凭据。

示例

使用 schema registry

要使用 Kafka 表引擎 读取使用 Avro 编码的 Kafka 主题,请通过 format_avro_schema_registry_url 设置指定 schema registry 的 URL。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

使用基本身份验证

如果 schema registry 需要基本身份验证(例如使用 Confluent Cloud 时),可以在 format_avro_schema_registry_url 设置中提供经过 URL 编码的凭证。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

故障排查

要监控摄取进度并调试 Kafka 消费者的错误,可以查询 system.kafka_consumers 系统表。如果您的部署有多个副本(例如 ClickHouse Cloud),则必须使用 clusterAllReplicas 表函数。

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

如果遇到 schema 解析相关问题,可以使用 kafkacat 搭配 clickhouse-local 进行排查:

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c