メインコンテンツへスキップ
メインコンテンツへスキップ

Kafka と ClickHouse で Vector を利用する

Kafka と ClickHouse で Vector を使用する

Vector はベンダー非依存のデータパイプラインであり、Kafka からデータを読み取り、ClickHouse にイベントを送信できます。

ClickHouse と組み合わせた Vector の入門ガイドでは、ログのユースケースとファイルからのイベント読み取りに焦点を当てています。ここでは、Kafka トピックに格納されたイベントを含む GitHub サンプルデータセットを利用します。

Vector は、プッシュまたはプルモデルでデータを取得するために sources を利用します。一方で sinks はイベントの送信先を提供します。したがって、Kafka source と ClickHouse sink を使用します。なお、Kafka は sink としてサポートされていますが、ClickHouse source は利用できません。その結果、Vector は ClickHouse から Kafka へデータを転送したいユーザーには適していません。

Vector はデータの変換にも対応していますが、これは本ガイドの範囲外です。この機能が必要な場合は、Vector のドキュメントを参照してください。

現在の ClickHouse sink の実装では HTTP インターフェースを利用している点に注意してください。ClickHouse sink は現時点では JSON スキーマの利用をサポートしていません。データはプレーンな JSON 形式、もしくは文字列として Kafka に送信される必要があります。

ライセンス

Vector は MPL-2.0 License の下で配布されています。

接続情報を収集する

HTTP(S) で ClickHouse に接続するには、次の情報が必要です。

Parameter(s)Description
HOST and PORT通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合のポートは 8123 です。
DATABASE NAME既定で default という名前のデータベースが用意されています。接続したいデータベースの名前を使用してください。
USERNAME and PASSWORD既定のユーザー名は default です。用途に応じて適切なユーザー名を使用してください。

ClickHouse Cloud サービスに関する詳細情報は、ClickHouse Cloud コンソールで確認できます。 サービスを選択し、Connect をクリックします。

ClickHouse Cloud サービスの Connect ボタン

HTTPS を選択します。接続情報は、サンプルの curl コマンド内に表示されます。

ClickHouse Cloud HTTPS 接続詳細

自己管理型の ClickHouse を使用している場合、接続情報は ClickHouse 管理者によって設定されます。

手順

  1. Kafka に github トピックを作成し、GitHub データセットを投入します。
cat /opt/data/github/github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username> -X sasl.password=<password> -t github

このデータセットは、ClickHouse/ClickHouse リポジトリに焦点を当てた 200,000 行で構成されています。

  1. 対象テーブルが作成されていることを確認します。ここではデフォルトデータベースを使用します。

CREATE TABLE github ( file_time DateTime, event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), actor_login LowCardinality(String), repo_name LowCardinality(String), created_at DateTime, updated_at DateTime, action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20), comment_id UInt64, path String, ref LowCardinality(String), ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4), creator_user_login LowCardinality(String), number UInt32, title String, labels Array(LowCardinality(String)), state Enum('none' = 0, 'open' = 1, 'closed' = 2), assignee LowCardinality(String), assignees Array(LowCardinality(String)), closed_at DateTime, merged_at DateTime, merge_commit_sha String, requested_reviewers Array(LowCardinality(String)), merged_by LowCardinality(String), review_comments UInt32, member_login LowCardinality(String) ) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);


3. [Vectorをダウンロードしてインストールします](https://vector.dev/docs/setup/quickstart/)。`kafka.toml`設定ファイルを作成し、KafkaおよびClickHouseインスタンスに合わせて値を変更してください。

```toml
[sources.github]
type = "kafka"
auto_offset_reset = "smallest"
bootstrap_servers = "<kafka_host>:<kafka_port>"
group_id = "vector"
topics = [ "github" ]
tls.enabled = true
sasl.enabled = true
sasl.mechanism = "PLAIN"
sasl.username = "<username>"
sasl.password = "<password>"
decoding.codec = "json"

[sinks.clickhouse]
type = "clickhouse"
inputs = ["github"]
endpoint = "http://localhost:8123"
database = "default"
table = "github"
skip_unknown_fields = true
auth.strategy = "basic"
auth.user = "username"
auth.password = "password"
buffer.max_events = 10000
batch.timeout_secs = 1

この設定および Vector の動作について、いくつか重要な注意点があります。

  • この例は Confluent Cloud に対してテストされています。そのため、sasl.* および ssl.enabled セキュリティオプションは、セルフマネージドなケースでは適切でない可能性があります。
  • 設定パラメータ bootstrap_servers にはプロトコルのプレフィックスは不要です(例: pkc-2396y.us-east-1.aws.confluent.cloud:9092)。
  • ソースパラメータ decoding.codec = "json" は、メッセージが単一の JSON オブジェクトとして ClickHouse sink に渡されることを保証します。メッセージを文字列として扱い、デフォルト値の bytes を使用する場合、メッセージの内容はフィールド message に格納されます。多くの場合、これは Vector getting started ガイドで説明しているように、ClickHouse 側での処理が必要になります。
  • Vector はメッセージに対して多数のフィールドを追加します。この例では、ClickHouse sink の設定パラメータ skip_unknown_fields = true によって、これらのフィールドを無視しています。これは、ターゲットテーブルのスキーマに含まれないフィールドを無視する設定です。offset のようなこれらのメタフィールドが追加されるように、スキーマを調整してもかまいません。
  • inputs パラメータによって、sink がイベントのソースを参照している点に注目してください。
  • ClickHouse sink の動作についてはこちらを参照してください。スループットを最適化するため、buffer.max_eventsbatch.timeout_secsbatch.max_bytes パラメータのチューニングを検討してください。ClickHouse の推奨事項に従うと、1 バッチあたりのイベント数については、1000 を最小値として考慮する必要があります。スループットが一様に高いユースケースでは、buffer.max_events パラメータを増やすことができます。スループットにばらつきがある場合は、batch.timeout_secs パラメータの調整が必要になることがあります。
  • パラメータ auto_offset_reset = "smallest" は、Kafka ソースがトピックの先頭から読み取りを開始することを強制し、これによりステップ (1) で公開されたメッセージを確実に消費できるようにします。ユーザーによっては、異なる動作が必要になることがあります。詳細はこちらを参照してください。
  1. Vector を起動する
vector --config ./kafka.toml

デフォルトでは、ClickHouse への挿入処理が開始される前に、health check が必要です。これにより、接続が確立できることと、スキーマが読み取れることが保証されます。問題が発生した場合に役立つ、より詳細なログを取得するには、コマンドの前に VECTOR_LOG=debug を付けて実行してください。

  1. データが挿入されたことを確認します。
SELECT count() AS count FROM github;
件数
200000