跳到主要内容
跳到主要内容

将 Vector 与 Kafka 和 ClickHouse 集成使用

在 Kafka 和 ClickHouse 中使用 Vector

Vector 是一个与厂商无关的数据管道,能够从 Kafka 读取数据并将事件发送到 ClickHouse。

针对 Vector 与 ClickHouse 的入门指南重点关注日志使用场景以及从文件中读取事件。我们使用包含在 Kafka topic 中事件的 GitHub 示例数据集

Vector 使用 sources 通过推送或拉取模型来获取数据。sinks 则为事件提供目标位置。因此,我们在此使用 Kafka source 和 ClickHouse sink。请注意,尽管 Kafka 支持作为 sink 使用,但目前尚无 ClickHouse source。因此,对于希望将数据从 ClickHouse 传输到 Kafka 的用户来说,Vector 并不适用。

Vector 还支持对数据进行转换。这超出了本指南的范围。如果用户需要在其数据集上进行数据转换,请参考 Vector 文档。

请注意,当前 ClickHouse sink 的实现使用的是 HTTP 接口。ClickHouse sink 目前不支持使用 JSON schema。数据必须以纯 JSON 格式或字符串形式发布到 Kafka。

许可证

Vector 根据 MPL-2.0 License 进行分发。

收集连接信息

要通过 HTTP(S) 连接到 ClickHouse,您需要以下信息:

参数说明
HOSTPORT通常,在使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。
DATABASE NAME默认提供一个名为 default 的数据库,请填写您要连接的目标数据库名称。
USERNAMEPASSWORD默认用户名为 default。请使用适合您使用场景的用户名。

您的 ClickHouse Cloud 服务的详细信息可以在 ClickHouse Cloud 控制台中查看。 选择某个服务并点击 Connect

ClickHouse Cloud 服务 Connect 按钮

选择 HTTPS。连接信息会显示在示例 curl 命令中。

ClickHouse Cloud HTTPS 连接信息

如果您使用的是自托管 ClickHouse,则连接信息由您的 ClickHouse 管理员进行设置。

步骤

  1. 创建 Kafka github topic 并写入 GitHub 数据集
cat /opt/data/github/github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username> -X sasl.password=<password> -t github

该数据集包含 200,000 行,聚焦于 ClickHouse/ClickHouse 仓库。

  1. 确保目标表已创建。下面我们将使用默认数据库。

CREATE TABLE github ( file_time DateTime, event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), actor_login LowCardinality(String), repo_name LowCardinality(String), created_at DateTime, updated_at DateTime, action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20), comment_id UInt64, path String, ref LowCardinality(String), ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4), creator_user_login LowCardinality(String), number UInt32, title String, labels Array(LowCardinality(String)), state Enum('none' = 0, 'open' = 1, 'closed' = 2), assignee LowCardinality(String), assignees Array(LowCardinality(String)), closed_at DateTime, merged_at DateTime, merge_commit_sha String, requested_reviewers Array(LowCardinality(String)), merged_by LowCardinality(String), review_comments UInt32, member_login LowCardinality(String) ) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);


3. [下载并安装 Vector](https://vector.dev/docs/setup/quickstart/)。创建一个 `kafka.toml` 配置文件,并根据你的 Kafka 和 ClickHouse 实例调整其中的参数值。

```toml
[sources.github]
type = "kafka"
auto_offset_reset = "smallest"
bootstrap_servers = "<kafka_host>:<kafka_port>"
group_id = "vector"
topics = [ "github" ]
tls.enabled = true
sasl.enabled = true
sasl.mechanism = "PLAIN"
sasl.username = "<username>"
sasl.password = "<password>"
decoding.codec = "json"

[sinks.clickhouse]
type = "clickhouse"
inputs = ["github"]
endpoint = "http://localhost:8123"
database = "default"
table = "github"
skip_unknown_fields = true
auth.strategy = "basic"
auth.user = "username"
auth.password = "password"
buffer.max_events = 10000
batch.timeout_secs = 1

关于此配置和 Vector 行为,有几点重要说明:

  • 此示例已在 Confluent Cloud 上进行测试。因此,sasl.*ssl.enabled 安全选项在自行管理部署的场景中可能并不适用。
  • 配置参数 bootstrap_servers 不需要协议前缀,例如 pkc-2396y.us-east-1.aws.confluent.cloud:9092
  • source 端参数 decoding.codec = "json" 可确保消息作为单个 JSON 对象传递给 ClickHouse sink。如果将消息作为字符串处理并使用默认的 bytes 值,消息内容将被追加到字段 message 中。在大多数情况下,这需要在 ClickHouse 中进行进一步处理,如 Vector 入门 指南中所述。
  • Vector 会向消息添加若干字段。在我们的示例中,我们通过 ClickHouse sink 中的配置参数 skip_unknown_fields = true 忽略这些字段。该参数会忽略不属于目标表 schema 的字段。你可以根据需要调整 schema,以确保将 offset 等此类元字段添加进去。
  • 注意 sink 如何通过参数 inputs 引用事件的 source。
  • 注意 ClickHouse sink 的行为,如此处所述。为获得最佳吞吐量,用户可能需要调优 buffer.max_eventsbatch.timeout_secsbatch.max_bytes 参数。根据 ClickHouse 的建议,单个批次中的事件数量应至少为 1000。对于吞吐量持续较高的用例,可以增加参数 buffer.max_events。对于吞吐量波动较大的场景,可能需要调整参数 batch.timeout_secs
  • 参数 auto_offset_reset = "smallest" 会强制 Kafka source 从 topic 的起始位置开始读取——从而确保我们能消费在步骤 (1) 中发布的消息。用户可能需要不同的行为,更多详情参见此处
  1. 启动 Vector
vector --config ./kafka.toml

默认情况下,在开始向 ClickHouse 插入数据之前,需要先进行一次健康检查,以确保能够建立连接并读取 schema。将 VECTOR_LOG=debug 添加到命令前可以获取更详细的日志信息,在排查问题时会很有帮助。

  1. 确认数据已成功插入。
SELECT count() AS count FROM github;
数量
200000