跳到主要内容
跳到主要内容

Protobuf

输入输出别名

描述

Protobuf 格式是 Protocol Buffers 格式。

此格式需要一个外部格式 schema,并且会在不同查询之间缓存。

ClickHouse 支持:

  • proto2proto3 两种语法。
  • Repeated/optional/required 字段。

为了确定表列与 Protocol Buffers 消息类型字段之间的对应关系,ClickHouse 会比较它们的名称。 该比较不区分大小写,并且将字符 _(下划线)和 .(点)视为相同。 如果表列类型与 Protocol Buffers 消息字段类型不同,则会应用必要的类型转换。

支持嵌套消息。例如,对于下列消息类型中的字段 z

message MessageType {
  message XType {
    message YType {
      int32 z;
    };
    repeated YType y;
  };
  XType x;
};

ClickHouse 会尝试查找名为 x.y.z(或 x_y_zX.y_Z 等)的列。

嵌套消息适合作为嵌套数据结构的输入或输出。

如下所示,在 protobuf 架构(schema)中定义的默认值不会被应用,而是会使用表默认值来代替它们:

syntax = "proto2";

message MessageType {
  optional int32 result_per_page = 3 [default = 10];
}

如果消息包含 oneof,并且设置了 input_format_protobuf_oneof_presence,ClickHouse 会填充一个列,用于指示该 oneof 中实际出现的是哪个字段。

syntax = "proto3";

message StringOrString {
  oneof string_oneof {
    string string1 = 1;
    string string2 = 42;
  }
}
CREATE TABLE string_or_string ( string1 String, string2 String, string_oneof Enum('no'=0, 'hello' = 1, 'world' = 42))  Engine=MergeTree ORDER BY tuple();
INSERT INTO string_or_string from INFILE '$CURDIR/data_protobuf/String1' SETTINGS format_schema='$SCHEMADIR/string_or_string.proto:StringOrString' FORMAT ProtobufSingle;
SELECT * FROM string_or_string
   ┌─────────┬─────────┬──────────────┐
   │ string1 │ string2 │ string_oneof │
   ├─────────┼─────────┼──────────────┤
1. │         │ string2 │ world        │
   ├─────────┼─────────┼──────────────┤
2. │ string1 │         │ hello        │
   └─────────┴─────────┴──────────────┘

表示存在性的列名必须与 oneof 的名称相同。支持嵌套消息(参见 basic-examples)。 允许的类型为 Int8、UInt8、Int16、UInt16、Int32、UInt32、Int64、UInt64、Enum、Enum8 或 Enum16。 Enum(以及 Enum8 或 Enum16)必须包含 oneof 的所有可能标签外加 0(用于表示不存在),其字符串表示形式无关紧要。

设置 input_format_protobuf_oneof_presence 默认是禁用的。

ClickHouse 以 length-delimited 格式输入和输出 protobuf 消息。 这意味着在每条消息之前,都应将其长度写为可变宽度整数(varint)

示例用法

读取和写入数据

示例文件

本示例中使用的文件可在 examples 仓库 中获取。

在本示例中,我们将把文件 protobuf_message.bin 中的一些数据读取到一个 ClickHouse 表中,然后使用 Protobuf 格式将其写回到名为 protobuf_message_from_clickhouse.bin 的文件中。

给定文件 schemafile.proto

syntax = "proto3";  
  
message MessageType {  
  string name = 1;  
  string surname = 2;  
  uint32 birthDate = 3;  
  repeated string phoneNumbers = 4;  
};
生成二进制文件

如果您已经了解如何以 Protobuf 格式序列化和反序列化数据,可以跳过此步骤。

我们将使用 Python 将一些数据序列化到 protobuf_message.bin 中,然后将其读入 ClickHouse。 如果您希望使用其他语言,请参阅:《如何在常用语言中读写按长度分隔的 Protobuf 消息》

运行以下命令,在与 schemafile.proto 相同的目录中生成一个名为 schemafile_pb2.py 的 Python 文件。 此文件包含表示 UserData Protobuf 消息的 Python 类:

protoc --python_out=. schemafile.proto

现在,在与 schemafile_pb2.py 相同的目录中创建一个名为 generate_protobuf_data.py 的新 Python 文件, 并将以下代码粘贴到该文件中:

import schemafile_pb2  # Module generated by 'protoc'
from google.protobuf import text_format
from google.protobuf.internal.encoder import _VarintBytes # Import the internal varint encoder

def create_user_data_message(name, surname, birthDate, phoneNumbers):
    """
    Creates and populates a UserData Protobuf message.
    """
    message = schemafile_pb2.MessageType()
    message.name = name
    message.surname = surname
    message.birthDate = birthDate
    message.phoneNumbers.extend(phoneNumbers)
    return message

# The data for our example users
data_to_serialize = [
    {"name": "Aisha", "surname": "Khan", "birthDate": 19920815, "phoneNumbers": ["(555) 247-8903", "(555) 612-3457"]},
    {"name": "Javier", "surname": "Rodriguez", "birthDate": 20001015, "phoneNumbers": ["(555) 891-2046", "(555) 738-5129"]},
    {"name": "Mei", "surname": "Ling", "birthDate": 19980616, "phoneNumbers": ["(555) 956-1834", "(555) 403-7682"]},
]

output_filename = "protobuf_messages.bin"

# Open the binary file in write-binary mode ('wb')
with open(output_filename, "wb") as f:
    for item in data_to_serialize:
        # Create a Protobuf message instance for the current user
        message = create_user_data_message(
            item["name"],
            item["surname"],
            item["birthDate"],
            item["phoneNumbers"]
        )

        # Serialize the message
        serialized_data = message.SerializeToString()

        # Get the length of the serialized data
        message_length = len(serialized_data)

        # Use the Protobuf library's internal _VarintBytes to encode the length
        length_prefix = _VarintBytes(message_length)

        # Write the length prefix
        f.write(length_prefix)
        # Write the serialized message data
        f.write(serialized_data)

print(f"Protobuf messages (length-delimited) written to {output_filename}")

# --- Optional: Verification (reading back and printing) ---
# For reading back, we'll also use the internal Protobuf decoder for varints.
from google.protobuf.internal.decoder import _DecodeVarint32

print("\n--- Verifying by reading back ---")
with open(output_filename, "rb") as f:
    buf = f.read() # Read the whole file into a buffer for easier varint decoding
    n = 0
    while n < len(buf):
        # Decode the varint length prefix
        msg_len, new_pos = _DecodeVarint32(buf, n)
        n = new_pos
        
        # Extract the message data
        message_data = buf[n:n+msg_len]
        n += msg_len

        # Parse the message
        decoded_message = schemafile_pb2.MessageType()
        decoded_message.ParseFromString(message_data)
        print(text_format.MessageToString(decoded_message, as_utf8=True))

现在从命令行运行该脚本。建议在 Python 虚拟环境中运行,例如使用 uv

uv venv proto-venv
source proto-venv/bin/activate

您需要安装以下 Python 库:

uv pip install --upgrade protobuf

运行脚本以生成二进制文件:

python generate_protobuf_data.py

创建一个与该 schema 匹配的 ClickHouse 表:

CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE IF NOT EXISTS test.protobuf_messages (
  name String,
  surname String,
  birthDate UInt32,
  phoneNumbers Array(String)
)
ENGINE = MergeTree()
ORDER BY tuple()

在命令行中将数据插入表中:

cat protobuf_messages.bin | clickhouse-client --query "INSERT INTO test.protobuf_messages SETTINGS format_schema='schemafile:MessageType' FORMAT Protobuf"

你还可以使用 Protobuf 格式将数据写回二进制文件中:

SELECT * FROM test.protobuf_messages INTO OUTFILE 'protobuf_message_from_clickhouse.bin' FORMAT Protobuf SETTINGS format_schema = 'schemafile:MessageType'

有了你的 Protobuf 模式定义,你现在可以对 ClickHouse 写入到文件 protobuf_message_from_clickhouse.bin 的数据进行反序列化了。

使用 ClickHouse Cloud 读取和写入数据

在 ClickHouse Cloud 中,您无法上传 Protobuf schema 文件。不过,您可以使用 format_protobuf_schema 设置项在查询中指定该 schema。下面的示例演示如何从本地机器读取序列化数据,并将其插入 ClickHouse Cloud 中的一张表。

与前一个示例类似,请在 ClickHouse Cloud 中根据 Protobuf schema 定义的结构创建表:

CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE IF NOT EXISTS test.protobuf_messages (
  name String,
  surname String,
  birthDate UInt32,
  phoneNumbers Array(String)
)
ENGINE = MergeTree()
ORDER BY tuple()

format_schema_source 设置用于定义 format_schema 的来源。

可能的取值:

  • 'file'(默认):在 Cloud 中不支持
  • 'string':format_schema 为 schema 的字面内容。
  • 'query':format_schema 为用于获取 schema 的查询语句。

format_schema_source='string'

要将数据插入 ClickHouse Cloud,并将 schema 以字符串形式指定时,请运行:

cat protobuf_messages.bin | clickhouse client --host <主机名> --secure --password <密码> --query "INSERT INTO testing.protobuf_messages SETTINGS format_schema_source='syntax = "proto3";message MessageType {  string name = 1;  string surname = 2;  uint32 birthDate = 3;  repeated string phoneNumbers = 4;};', format_schema='schemafile:MessageType' FORMAT Protobuf"

选择已插入表中的数据:

clickhouse client --host <主机名> --secure --password <密码> --query "SELECT * FROM testing.protobuf_messages"
Aisha Khan 19920815 ['(555) 247-8903','(555) 612-3457']
Javier Rodriguez 20001015 ['(555) 891-2046','(555) 738-5129']
Mei Ling 19980616 ['(555) 956-1834','(555) 403-7682']

format_schema_source='query'

你还可以将 Protobuf schema 存储在一张表中。

在 ClickHouse Cloud 上创建一张用于插入数据的表:

CREATE TABLE testing.protobuf_schema (
  schema String
)
ENGINE = MergeTree()
ORDER BY tuple();
INSERT INTO testing.protobuf_schema VALUES ('syntax = "proto3";message MessageType {  string name = 1;  string surname = 2;  uint32 birthDate = 3;  repeated string phoneNumbers = 4;};');

将数据插入 ClickHouse Cloud,并通过查询语句指定 schema:

cat protobuf_messages.bin | clickhouse client --host <主机名> --secure --password <密码> --query "INSERT INTO testing.protobuf_messages SETTINGS format_schema_source='SELECT schema FROM testing.protobuf_schema', format_schema='schemafile:MessageType' FORMAT Protobuf"

查询已插入到该表中的数据:

clickhouse client --host <主机名> --secure --password <密码> --query "SELECT * FROM testing.protobuf_messages"
Aisha Khan 19920815 ['(555) 247-8903','(555) 612-3457']
Javier Rodriguez 20001015 ['(555) 891-2046','(555) 738-5129']
Mei Ling 19980616 ['(555) 956-1834','(555) 403-7682']

使用自动生成的 schema

如果你的数据没有外部的 Protobuf schema,仍然可以使用自动生成的 schema 以 Protobuf 格式输出/输入数据。为此,请使用 format_protobuf_use_autogenerated_schema 设置。

例如:

SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerated_schema=1

在这种情况下,ClickHouse 会根据表结构,使用函数 structureToProtobufSchema 自动生成 Protobuf 模式。然后它会使用该模式以 Protobuf 格式序列化数据。

你也可以使用自动生成的模式来读取 Protobuf 文件。在这种情况下,必须使用相同的模式来创建该文件:

$ cat hits.bin | clickhouse-client --query "INSERT INTO test.hits SETTINGS format_protobuf_use_autogenerated_schema=1 FORMAT Protobuf"

设置 format_protobuf_use_autogenerated_schema 默认启用,并且在未设置 format_schema 时生效。

你还可以在输入/输出期间,通过设置 output_format_schema 将自动生成的 schema 保存到文件中。例如:

SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerated_schema=1, output_format_schema='path/to/schema/schema.proto'

在这种情况下,会将自动生成的 Protobuf schema 保存在文件 path/to/schema/schema.capnp 中。

删除 Protobuf 缓存

要重新加载从 format_schema_path 加载的 Protobuf 架构,请使用 SYSTEM DROP ... FORMAT CACHE 语句。

SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf