メインコンテンツへスキップ
メインコンテンツへスキップ

AvroConfluent

入力出力エイリアス

説明

Apache Avro は、効率的なデータ処理のためにバイナリエンコードを使用する行指向のシリアル化フォーマットです。AvroConfluent フォーマットは、Confluent Schema Registry(またはその API 互換サービス)を用いてシリアル化された、単一オブジェクト形式の Avro でエンコードされた Kafka メッセージのデコードをサポートします。

各 Avro メッセージにはスキーマ ID が埋め込まれており、ClickHouse は設定済みの Schema Registry に対してクエリを実行することで自動的にスキーマを解決します。一度解決されたスキーマは、パフォーマンスを最適化するためにキャッシュされます。

データ型のマッピング

次の表は、Apache Avro 形式でサポートされているすべてのデータ型と、INSERT クエリおよび SELECT クエリにおける対応する ClickHouse のデータ型を示しています。

Avro データ型 INSERTClickHouse データ型Avro データ型 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes または string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* bytes がデフォルトであり、この挙動は設定 output_format_avro_string_column_pattern によって制御されます

** Variant type はフィールド値として暗黙的に null を受け入れるため、たとえば Avro の union(T1, T2, null)Variant(T1, T2) に変換されます。 その結果、ClickHouse から Avro を生成する際には、スキーマ推論中に任意の値が実際に null かどうかを判断できないため、常に Avro の union 型の集合に null 型を含める必要があります。

*** Avro logical types

サポートされていない Avro の論理データ型:

  • time-millis
  • time-micros
  • duration

フォーマット設定

SettingDescriptionDefault
input_format_avro_allow_missing_fieldsスキーマ内にフィールドが見つからない場合にエラーを発生させる代わりに、デフォルト値を使用するかどうか。0
input_format_avro_null_as_defaultNULL を許容しないカラムに null 値を挿入する際にエラーを発生させる代わりに、デフォルト値を使用するかどうか。0
format_avro_schema_registry_urlConfluent Schema Registry の URL。Basic 認証を利用する場合は、URL エンコードした認証情報を URL のパス部分に直接含めることができます。

スキーマレジストリの使用

Kafka table engine を使用して Avro でエンコードされた Kafka トピックを読み取るには、format_avro_schema_registry_url 設定を使用してスキーマレジストリの URL を指定します。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Basic 認証の使用

スキーマレジストリが Basic 認証を必要とする場合(例:Confluent Cloud を使用している場合)、format_avro_schema_registry_url 設定に URL エンコード済みの認証情報を指定できます。

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

トラブルシューティング

インジェスト処理の進行状況を監視し、Kafka コンシューマーで発生するエラーをデバッグするには、system.kafka_consumers システムテーブルに対してクエリを実行できます。デプロイメントに複数のレプリカがある場合(例:ClickHouse Cloud)、clusterAllReplicas テーブル関数を使用する必要があります。

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

スキーマの解決で問題が発生した場合は、kafkacatclickhouse-local を使用してトラブルシューティングできます。

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c