メインコンテンツへスキップ
メインコンテンツへスキップ

ClickHouse と S3 の統合

S3 から ClickHouse にデータを挿入できるほか、S3 をエクスポート先としても利用できるため、「データレイク」アーキテクチャとの連携が可能になります。さらに、S3 は「コールド」ストレージ階層を提供し、ストレージとコンピュートの分離にも役立ちます。以下のセクションでは、New York City taxi データセットを用いて、S3 と ClickHouse 間でデータを移動する手順を示すとともに、主要な構成パラメータを明らかにし、パフォーマンスを最適化するためのヒントを紹介します。

S3 テーブル関数

s3 テーブル関数を使用すると、S3 互換ストレージからおよび S3 互換ストレージへファイルの読み取りと書き込みができます。構文の概要は次のとおりです。

s3(path, [aws_access_key_id, aws_secret_access_key,] [format, [structure, [compression]]])

where:

  • path — ファイルへのパスを含むバケットの URL。読み取り専用モードで次のワイルドカードをサポートします: *, ?, {abc,def}, {N..M}。ここで、N, M は数値、'abc', 'def' は文字列です。詳細は、パスでのワイルドカードの使用に関するドキュメントを参照してください。
  • format — ファイルのフォーマット
  • structure — テーブルの構造。書式は 'column1_name column1_type, column2_name column2_type, ...' です。
  • compression — 省略可能なパラメータ。サポートされている値: none, gzip/gz, brotli/br, xz/LZMA, zstd/zst。デフォルトでは、ファイル拡張子から圧縮形式を自動検出します。

パス式でワイルドカードを使用すると、複数のファイルを参照できるようになり、並列処理による読み取りが可能になります。

準備

ClickHouse でテーブルを作成する前に、まず S3 バケット内のデータを詳しく確認しておくとよいでしょう。これは、ClickHouse から直接 DESCRIBE ステートメントを使用して実行できます。

DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames');

DESCRIBE TABLE ステートメントの出力を見ると、S3 バケット内のデータを ClickHouse がどのように自動推論するかがわかります。また、gzip 圧縮形式も自動的に認識して解凍されることに注意してください。

DESCRIBE TABLE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames') SETTINGS describe_compact_output=1

┌─name──────────────────┬─type───────────────┐ │ trip_id │ Nullable(Int64) │ │ vendor_id │ Nullable(Int64) │ │ pickup_date │ Nullable(Date) │ │ pickup_datetime │ Nullable(DateTime) │ │ dropoff_date │ Nullable(Date) │ │ dropoff_datetime │ Nullable(DateTime) │ │ store_and_fwd_flag │ Nullable(Int64) │ │ rate_code_id │ Nullable(Int64) │ │ pickup_longitude │ Nullable(Float64) │ │ pickup_latitude │ Nullable(Float64) │ │ dropoff_longitude │ Nullable(Float64) │ │ dropoff_latitude │ Nullable(Float64) │ │ passenger_count │ Nullable(Int64) │ │ trip_distance │ Nullable(String) │ │ fare_amount │ Nullable(String) │ │ extra │ Nullable(String) │ │ mta_tax │ Nullable(String) │ │ tip_amount │ Nullable(String) │ │ tolls_amount │ Nullable(Float64) │ │ ehail_fee │ Nullable(Int64) │ │ improvement_surcharge │ Nullable(String) │ │ total_amount │ Nullable(String) │ │ payment_type │ Nullable(String) │ │ trip_type │ Nullable(Int64) │ │ pickup │ Nullable(String) │ │ dropoff │ Nullable(String) │ │ cab_type │ Nullable(String) │ │ pickup_nyct2010_gid │ Nullable(Int64) │ │ pickup_ctlabel │ Nullable(Float64) │ │ pickup_borocode │ Nullable(Int64) │ │ pickup_ct2010 │ Nullable(String) │ │ pickup_boroct2010 │ Nullable(String) │ │ pickup_cdeligibil │ Nullable(String) │ │ pickup_ntacode │ Nullable(String) │ │ pickup_ntaname │ Nullable(String) │ │ pickup_puma │ Nullable(Int64) │ │ dropoff_nyct2010_gid │ Nullable(Int64) │ │ dropoff_ctlabel │ Nullable(Float64) │ │ dropoff_borocode │ Nullable(Int64) │ │ dropoff_ct2010 │ Nullable(String) │ │ dropoff_boroct2010 │ Nullable(String) │ │ dropoff_cdeligibil │ Nullable(String) │ │ dropoff_ntacode │ Nullable(String) │ │ dropoff_ntaname │ Nullable(String) │ │ dropoff_puma │ Nullable(Int64) │ └───────────────────────┴────────────────────┘


S3ベースのデータセットを操作するために、標準的な`MergeTree`テーブルを宛先として準備します。以下のステートメントは、デフォルトデータベースに`trips`という名前のテーブルを作成します。上記で推論されたデータ型の一部を変更しており、特に[`Nullable()`](/sql-reference/data-types/nullable)データ型修飾子は使用しないようにしています。これは、不要な追加ストレージと若干のパフォーマンスオーバーヘッドを引き起こす可能性があるためです。
CREATE TABLE trips
(
    `trip_id` UInt32,
    `vendor_id` Enum8('1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15),
    `pickup_date` Date,
    `pickup_datetime` DateTime,
    `dropoff_date` Date,
    `dropoff_datetime` DateTime,
    `store_and_fwd_flag` UInt8,
    `rate_code_id` UInt8,
    `pickup_longitude` Float64,
    `pickup_latitude` Float64,
    `dropoff_longitude` Float64,
    `dropoff_latitude` Float64,
    `passenger_count` UInt8,
    `trip_distance` Float64,
    `fare_amount` Float32,
    `extra` Float32,
    `mta_tax` Float32,
    `tip_amount` Float32,
    `tolls_amount` Float32,
    `ehail_fee` Float32,
    `improvement_surcharge` Float32,
    `total_amount` Float32,
    `payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    `trip_type` UInt8,
    `pickup` FixedString(25),
    `dropoff` FixedString(25),
    `cab_type` Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    `pickup_nyct2010_gid` Int8,
    `pickup_ctlabel` Float32,
    `pickup_borocode` Int8,
    `pickup_ct2010` String,
    `pickup_boroct2010` String,
    `pickup_cdeligibil` String,
    `pickup_ntacode` FixedString(4),
    `pickup_ntaname` String,
    `pickup_puma` UInt16,
    `dropoff_nyct2010_gid` UInt8,
    `dropoff_ctlabel` Float32,
    `dropoff_borocode` UInt8,
    `dropoff_ct2010` String,
    `dropoff_boroct2010` String,
    `dropoff_cdeligibil` String,
    `dropoff_ntacode` FixedString(4),
    `dropoff_ntaname` String,
    `dropoff_puma` UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime

pickup_date フィールドに対するパーティショニングの利用に注目してください。通常、パーティションキーはデータ管理のために使用されますが、後ほどこのキーを使って S3 への書き込みを並列化します。

タクシーデータセットの各エントリは、1 件のタクシー乗車を表しています。この匿名化されたデータは、S3 バケット https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi フォルダ内に格納された、圧縮済みの 2,000 万件のレコードで構成されています。データは TSV 形式で、ファイルあたりおよそ 100 万行が含まれています。

S3 からデータを読み込む

ClickHouse に永続化することなく、S3 上のデータをソースとしてクエリできます。次のクエリでは、10 行だけをサンプリングします。バケットが公開されているため、ここでは認証情報が不要である点に注意してください。

SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames')
LIMIT 10;

TabSeparatedWithNames 形式では最初の行にカラム名が含まれているため、カラム名を明示的に指定する必要はないことに注意してください。CSVTSV などの他の形式では、このクエリに対して c1c2c3 のような自動生成されたカラムが返されます。

クエリではさらに、バケット内のパスおよびファイル名に関する情報をそれぞれ提供する _path_file のような仮想カラムもサポートしています。例えば:

SELECT  _path, _file, trip_id
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_0.gz', 'TabSeparatedWithNames')
LIMIT 5;
┌─_path──────────────────────────────────────┬─_file──────┬────trip_id─┐
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999902 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999919 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999944 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999969 │
│ datasets-documentation/nyc-taxi/trips_0.gz │ trips_0.gz │ 1199999990 │
└────────────────────────────────────────────┴────────────┴────────────┘

このサンプルデータセットに含まれる行数を確認します。ファイル展開のためにワイルドカードを使用しているため、20 個すべてのファイルが対象になります。ClickHouse インスタンス上のコア数にもよりますが、このクエリの実行にはおよそ 10 秒かかります。

SELECT count() AS count
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames');
┌────count─┐
│ 20000000 │
└──────────┘

S3 から直接データを読み取る方法は、データのサンプリングやアドホックな探索的クエリを実行するには有用ですが、日常的に行うべきものではありません。本格的に運用する段階になったら、ClickHouse の MergeTree テーブルにデータをインポートしてください。

clickhouse-local の使用

clickhouse-local プログラムを使用すると、ClickHouse サーバーをデプロイしたり設定したりすることなく、ローカルファイルに対して高速な処理を実行できます。s3 テーブル関数を用いたクエリは、すべてこのユーティリティで実行できます。例えば、次のように実行します。

clickhouse-local --query "SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames') LIMIT 10"

S3 からのデータ挿入

ClickHouse の機能を最大限に活用するために、次にデータを読み取り、インスタンスに挿入します。 これを行うために、s3 関数とシンプルな INSERT 文を組み合わせます。ターゲットテーブル側で必要な構造が定義されているため、列を列挙する必要はない点に注意してください。この場合、テーブルの DDL 文で指定された順序で列が並んでいる必要があります。列は SELECT 句内での位置に基づいてマッピングされます。1,000 万行すべてを挿入する処理には、ClickHouse インスタンスによっては数分かかる場合があります。以下では、すばやく結果を得るために 100 万行のみを挿入しています。必要に応じて、LIMIT 句や列の選択を調整して、一部だけをインポートしてください。

INSERT INTO trips
   SELECT *
   FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames')
   LIMIT 1000000;

ClickHouse Local を使用したリモート挿入

ネットワークセキュリティポリシーによって ClickHouse クラスターからの外向き接続が禁止されている場合、clickhouse-local を使用して S3 データを挿入することもできます。以下の例では、S3 バケットから読み取り、remote 関数を使用して ClickHouse に挿入します。

clickhouse-local --query "INSERT INTO TABLE FUNCTION remote('localhost:9000', 'default.trips', 'username', 'password') (*) SELECT * FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz', 'TabSeparatedWithNames') LIMIT 10"
注記

これを安全な SSL 接続で実行するには、remoteSecure 関数を使用します。

データのエクスポート

s3 テーブル関数を使用して S3 上のファイルに書き込むことができます。これには適切な権限が必要です。必要な認証情報はリクエスト内で渡しますが、その他のオプションについては Managing Credentials ページを参照してください。

以下のシンプルな例では、テーブル関数をソースではなく出力先として使用しています。ここでは、trips テーブルから 10,000 行を、lz4 圧縮と出力形式としての CSV を指定して、S3 バケットにストリーミングしています。

INSERT INTO FUNCTION
   s3(
       'https://datasets-documentation.s3.eu-west-3.amazonaws.com/csv/trips.csv.lz4',
       's3_key',
       's3_secret',
       'CSV'
    )
SELECT *
FROM trips
LIMIT 10000;

ここでは、ファイルの形式が拡張子から自動的に判別されることに注目してください。また、s3 関数では列を指定する必要もありません。これは SELECT から自動的に推論されます。

大きなファイルの分割

データを 1 つのファイルとしてエクスポートしたいケースはあまりないでしょう。ClickHouse を含むほとんどのツールは、並列処理が可能になるため、複数ファイルへの読み書きを行うことでスループットが向上します。INSERT コマンドを複数回実行し、データのサブセットを対象にすることもできます。ClickHouse には、PARTITION キーを使用してファイルを自動的に分割する方法が用意されています。

次の例では、rand() 関数の剰余を用いて 10 個のファイルを作成します。生成されたパーティション ID がファイル名でどのように参照されているかに注目してください。これにより、trips_0.csv.lz4trips_1.csv.lz4 などのように、数値のサフィックスを持つ 10 個のファイルが生成されます。

INSERT INTO FUNCTION
   s3(
       'https://datasets-documentation.s3.eu-west-3.amazonaws.com/csv/trips_{_partition_id}.csv.lz4',
       's3_key',
       's3_secret',
       'CSV'
    )
    PARTITION BY rand() % 10
SELECT *
FROM trips
LIMIT 100000;

別の方法として、データ内のフィールドを参照することもできます。このデータセットでは、payment_type は 5 種類の値を持つ自然なパーティションキーになります。

INSERT INTO FUNCTION
   s3(
       'https://datasets-documentation.s3.eu-west-3.amazonaws.com/csv/trips_{_partition_id}.csv.lz4',
       's3_key',
       's3_secret',
       'CSV'
    )
    PARTITION BY payment_type
SELECT *
FROM trips
LIMIT 100000;

クラスターの活用

上記の関数はすべて、単一ノード上での実行に限定されています。読み取り速度は、他のリソース(通常はネットワーク)が飽和するまで CPU コア数に比例して向上し、ユーザーは垂直スケールが可能です。しかし、このアプローチには制約があります。ユーザーは INSERT INTO SELECT クエリを実行する際に分散テーブルに挿入することで、ある程度リソースの負荷を軽減できますが、それでも単一ノードでデータの読み取り、パース、処理を行う点は変わりません。この課題に対処し、読み取りを水平方向にスケールさせるために用意されているのが、s3Cluster 関数です。

クエリを受け取るノードはイニシエーターと呼ばれ、クラスター内のすべてのノードに接続を確立します。どのファイルを読み取る必要があるかを決定するグロブパターンは、読み取る対象となるファイルの集合へと解決されます。イニシエーターは、クラスター内のノード(ワーカーとして動作)にファイルを分配します。これらのワーカーは、読み取りを完了するごとに、処理するファイルを要求します。このプロセスにより、読み取りを水平方向にスケールさせることができます。

s3Cluster 関数は、対象となるクラスターをワーカーノードとして指定する必要がある点を除き、単一ノード版と同じ形式を取ります。

s3Cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure)
  • cluster_name — リモートおよびローカルサーバーへのアドレスと接続パラメータの集合を構成するために使用されるクラスタ名。
  • source — 単一または複数ファイルへの URL。読み取り専用モードで次のワイルドカードをサポートします: *, ?, {'abc','def'} および {N..M}。ここで N, M は数値、abc, def は文字列です。詳細は Wildcards In Path を参照してください。
  • access_key_idsecret_access_key — 指定されたエンドポイントで使用する認証情報を表すキー。省略可能です。
  • format — ファイルの format
  • structure — テーブルの構造。フォーマットは 'column1_name column1_type, column2_name column2_type, ...' です。

任意の s3 関数と同様に、バケットが保護されていない場合や、IAM ロールなどの環境経由で認証・権限付与を行っている場合は、認証情報は省略可能です。ただし s3 関数とは異なり、22.3.1 時点ではリクエスト内で structure を指定する必要があり、スキーマは自動推論されません。

この関数は、ほとんどのケースで INSERT INTO SELECT の一部として使用されます。この場合、多くは分散テーブルに対して INSERT を行います。以下では、trips_all が分散テーブルである単純な例を示します。このテーブルは events クラスタを使用していますが、読み取りおよび書き込みに使用されるノードの一貫性は必須要件ではありません。

INSERT INTO default.trips_all
   SELECT *
   FROM s3Cluster(
       'events',
       'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_*.gz',
       'TabSeparatedWithNames'
    )

INSERT は initiator ノードに対して実行されます。つまり、読み取りは各ノードで行われますが、得られた行は分散処理のために initiator にルーティングされます。高スループットなシナリオでは、これがボトルネックとなる可能性があります。これに対処するには、s3cluster 関数に対してパラメータ parallel_distributed_insert_select を設定します。

S3 テーブルエンジン

s3 関数を使用すると、S3 に保存されたデータに対してアドホッククエリを実行できますが、構文が冗長になりがちです。この問題を解決するために用意されているのが、バケットの URL や認証情報を何度も指定する必要がなくなる S3 テーブルエンジンです。

CREATE TABLE s3_engine_table (name String, value UInt32)
    ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression])
    [SETTINGS ...]
  • path — ファイルへのパスを含むバケットの URL。読み取り専用モードでは、次のワイルドカードをサポートします: *?{abc,def}{N..M}。ここで N、M は数値、'abc'、'def' は文字列です。詳細はこちらを参照してください。
  • format — ファイルのフォーマット
  • aws_access_key_id, aws_secret_access_key - AWS アカウントユーザー用の長期認証情報。リクエストの認証に使用できます。このパラメータは省略可能です。認証情報が指定されていない場合は、設定ファイルの値が使用されます。詳細は認証情報の管理を参照してください。
  • compression — 圧縮形式。サポートされる値: none, gzip/gz, brotli/br, xz/LZMA, zstd/zst。パラメータは省略可能です。デフォルトでは、ファイル拡張子に基づいて圧縮方式を自動検出します。

データの読み取り

次の例では、https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/ バケット内にある最初の 10 個の TSV ファイルを使用して、trips_raw という名前のテーブルを作成します。各ファイルには 100 万行が含まれます。

CREATE TABLE trips_raw
(
   `trip_id`               UInt32,
   `vendor_id`             Enum8('1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15),
   `pickup_date`           Date,
   `pickup_datetime`       DateTime,
   `dropoff_date`          Date,
   `dropoff_datetime`      DateTime,
   `store_and_fwd_flag`    UInt8,
   `rate_code_id`          UInt8,
   `pickup_longitude`      Float64,
   `pickup_latitude`       Float64,
   `dropoff_longitude`     Float64,
   `dropoff_latitude`      Float64,
   `passenger_count`       UInt8,
   `trip_distance`         Float64,
   `fare_amount`           Float32,
   `extra`                 Float32,
   `mta_tax`               Float32,
   `tip_amount`            Float32,
   `tolls_amount`          Float32,
   `ehail_fee`             Float32,
   `improvement_surcharge` Float32,
   `total_amount`          Float32,
   `payment_type_`         Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
   `trip_type`             UInt8,
   `pickup`                FixedString(25),
   `dropoff`               FixedString(25),
   `cab_type`              Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
   `pickup_nyct2010_gid`   Int8,
   `pickup_ctlabel`        Float32,
   `pickup_borocode`       Int8,
   `pickup_ct2010`         String,
   `pickup_boroct2010`     FixedString(7),
   `pickup_cdeligibil`     String,
   `pickup_ntacode`        FixedString(4),
   `pickup_ntaname`        String,
   `pickup_puma`           UInt16,
   `dropoff_nyct2010_gid`  UInt8,
   `dropoff_ctlabel`       Float32,
   `dropoff_borocode`      UInt8,
   `dropoff_ct2010`        String,
   `dropoff_boroct2010`    FixedString(7),
   `dropoff_cdeligibil`    String,
   `dropoff_ntacode`       FixedString(4),
   `dropoff_ntaname`       String,
   `dropoff_puma`          UInt16
) ENGINE = S3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..9}.gz', 'TabSeparatedWithNames', 'gzip');

最初の10個のファイルに限定するために {0..9} パターンを使用している点に注意してください。作成されたら、このテーブルには他のテーブルと同様にクエリを実行できます。

SELECT DISTINCT(pickup_ntaname)
FROM trips_raw
LIMIT 10;

┌─pickup_ntaname───────────────────────────────────┐
│ Lenox Hill-Roosevelt Island                      │
│ Airport                                          │
│ SoHo-TriBeCa-Civic Center-Little Italy           │
│ West Village                                     │
│ Chinatown                                        │
│ Hudson Yards-Chelsea-Flatiron-Union Square       │
│ Turtle Bay-East Midtown                          │
│ Upper West Side                                  │
│ Murray Hill-Kips Bay                             │
│ DUMBO-Vinegar Hill-Downtown Brooklyn-Boerum Hill │
└──────────────────────────────────────────────────┘

データの挿入

S3 テーブルエンジンは並列読み出しをサポートします。書き込みは、テーブル定義にグロブパターンが含まれていない場合にのみサポートされます。そのため、上記のテーブルでは書き込みは行えません。

書き込みを示すために、書き込み可能な S3 バケットを参照するテーブルを作成します。

CREATE TABLE trips_dest
(
   `trip_id`               UInt32,
   `pickup_date`           Date,
   `pickup_datetime`       DateTime,
   `dropoff_datetime`      DateTime,
   `tip_amount`            Float32,
   `total_amount`          Float32
) ENGINE = S3('<bucket path>/trips.bin', 'Native');
INSERT INTO trips_dest
   SELECT
      trip_id,
      pickup_date,
      pickup_datetime,
      dropoff_datetime,
      tip_amount,
      total_amount
   FROM trips
   LIMIT 10;
SELECT * FROM trips_dest LIMIT 5;
┌────trip_id─┬─pickup_date─┬─────pickup_datetime─┬────dropoff_datetime─┬─tip_amount─┬─total_amount─┐
│ 1200018648 │  2015-07-01 │ 2015-07-01 00:00:16 │ 2015-07-01 00:02:57 │          0 │          7.3 │
│ 1201452450 │  2015-07-01 │ 2015-07-01 00:00:20 │ 2015-07-01 00:11:07 │       1.96 │        11.76 │
│ 1202368372 │  2015-07-01 │ 2015-07-01 00:00:40 │ 2015-07-01 00:05:46 │          0 │          7.3 │
│ 1200831168 │  2015-07-01 │ 2015-07-01 00:01:06 │ 2015-07-01 00:09:23 │          2 │         12.3 │
│ 1201362116 │  2015-07-01 │ 2015-07-01 00:01:07 │ 2015-07-01 00:03:31 │          0 │          5.3 │
└────────────┴─────────────┴─────────────────────┴─────────────────────┴────────────┴──────────────┘

行は新しいファイルにしか挿入できないことに注意してください。マージサイクルやファイル分割処理はありません。いったんファイルに書き込まれると、その後の挿入は失敗します。ユーザーには次の 2 つの選択肢があります。

  • 設定 s3_create_new_file_on_insert=1 を指定します。これにより、挿入ごとに新しいファイルが作成されます。各ファイルの末尾には数値のサフィックスが付加され、挿入操作ごとに単調増加します。上記の例では、後続の挿入を行うと trips_1.bin ファイルが作成されます。
  • 設定 s3_truncate_on_insert=1 を指定します。これによりファイルが切り詰められ、完了時には新たに挿入された行のみを含むようになります。

これら 2 つの設定のデフォルト値はいずれも 0 であるため、どちらか一方をユーザーが設定する必要があります。両方が設定されている場合は s3_truncate_on_insert が優先されます。

S3 テーブルエンジンについての注意点:

  • 従来の MergeTree ファミリのテーブルとは異なり、S3 テーブルを DROP しても、基盤となるデータは削除されません。
  • このテーブルタイプの完全な設定一覧は、こちらにあります。
  • このエンジンを使用する際は、次の点に注意してください:
    • ALTER クエリはサポートされません。
    • SAMPLE 操作はサポートされません。
    • 主キーやスキップインデックスといったインデックスの概念はありません。

認証情報の管理

前の例では、s3 関数または S3 テーブル定義の中で認証情報を渡してきました。これは単発の利用であれば許容できる場合もありますが、本番環境では、ユーザーは認証情報を明示的に記述しなくてもよい認証メカニズムを必要とします。これに対応するため、ClickHouse にはいくつかの選択肢があります。

  • 接続情報を config.xml または conf.d 配下の同等の設定ファイルに指定します。以下に、Debian パッケージを用いてインストールしたことを前提とした例ファイルの内容を示します。

    ubuntu@single-node-clickhouse:/etc/clickhouse-server/config.d$ cat s3.xml
    <clickhouse>
        <s3>
            <endpoint-name>
                <endpoint>https://dalem-files.s3.amazonaws.com/test/</endpoint>
                <access_key_id>key</access_key_id>
                <secret_access_key>secret</secret_access_key>
                <!-- <use_environment_credentials>false</use_environment_credentials> -->
                <!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
            </endpoint-name>
        </s3>
    </clickhouse>
    

    これらの認証情報は、上記のエンドポイントが要求された URL の先頭部分と完全に一致するリクエストに対して使用されます。また、この例ではアクセスキーおよびシークレットキーの代わりに、認可ヘッダーを指定できることにも注目してください。サポートされている設定の完全な一覧はこちらにあります。

  • 上の例では、設定パラメータ use_environment_credentials を利用できることを示しています。この設定パラメータは、s3 レベルでグローバルに設定することもできます。

    <clickhouse>
        <s3>
        <use_environment_credentials>true</use_environment_credentials>
        </s3>
    </clickhouse>
    

    この設定を有効にすると、環境から S3 の認証情報を取得しようと試みるようになり、IAM ロールを通じてアクセスできるようになります。具体的には、次の順序で認証情報の取得が行われます。

    • 環境変数 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY および AWS_SESSION_TOKEN の参照
    • $HOME/.aws の確認
    • AWS Security Token Service 経由で取得した一時的な認証情報(AssumeRole API 経由)
    • ECS 環境変数 AWS_CONTAINER_CREDENTIALS_RELATIVE_URI または AWS_CONTAINER_CREDENTIALS_FULL_URI および AWS_ECS_CONTAINER_AUTHORIZATION_TOKEN に対する認証情報の確認
    • AWS_EC2_METADATA_DISABLED が true に設定されていない場合に、Amazon EC2 インスタンスメタデータ 経由で認証情報を取得
    • これらと同じ設定は、同じプレフィックスマッチング規則を用いて特定のエンドポイントに対しても設定できます。

パフォーマンス最適化

S3 関数を使用した読み取りおよび挿入の最適化方法については、専用のパフォーマンスガイドを参照してください。

S3 ストレージのチューニング

内部的には、ClickHouse の MergeTree は 2 つの主要なストレージ形式(WideCompact)を使用します。現在の実装では、ClickHouse のデフォルトの動作(min_bytes_for_wide_part および min_rows_for_wide_part 設定によって制御)に準拠していますが、今後のリリースでは S3 向けに動作が変化すると予想されます。例えば、min_bytes_for_wide_part のデフォルト値を大きくすることで、より Compact な形式が推奨され、その結果としてファイル数が減少します。S3 ストレージのみを使用する場合は、これらの設定の調整を検討してください。

S3 バックエンドの MergeTree

s3 関数と関連するテーブルエンジンを使用すると、なじみのある ClickHouse の構文で S3 上のデータをクエリできます。ただし、データ管理機能およびパフォーマンスの観点からは制限があります。プライマリインデックスのサポートがなく、キャッシュ機構のサポートもありません。また、ファイルの挿入(書き込み)はユーザーが管理する必要があります。

ClickHouse は、特に「コールド」データに対するクエリ性能がそれほど重要ではなく、ストレージとコンピュートを分離したい場合に、S3 が魅力的なストレージソリューションであることを認識しています。これを実現するために、MergeTree エンジンのストレージとして S3 を使用するためのサポートが提供されています。これにより、ユーザーは S3 のスケーラビリティとコストメリット、および MergeTree エンジンの挿入とクエリのパフォーマンスを活用できるようになります。

ストレージ階層

ClickHouse のストレージボリューム機能により、物理ディスクを MergeTree テーブルエンジンから抽象化できます。単一のボリュームは、順序付けられたディスクの集合で構成できます。これは主に複数のブロックデバイスをデータストレージに利用できるようにするためのものですが、この抽象化により S3 を含む他のストレージタイプも利用可能になります。ClickHouse のデータパーツは、ストレージポリシーに従ってボリューム間を移動したり、使用率に応じて移動したりできるため、ストレージ階層という概念が生まれます。

ストレージ階層によりホット・コールド アーキテクチャが可能になります。もっとも新しいデータは、通常もっとも頻繁にクエリされるため、高性能ストレージ(例: NVMe SSD)の比較的小さな容量のみを必要とします。データが古くなるにつれて、SLA で許容されるクエリ時間は長くなり、クエリ頻度も下がります。この裾野の広いロングテールのデータは、HDD のような低速で性能の低いストレージや、S3 のようなオブジェクトストレージに保存できます。

ディスクの作成

S3 バケットをディスクとして利用するには、まず ClickHouse の設定ファイル内で宣言する必要があります。config.xml を拡張するか、望ましくは conf.d 配下に新しいファイルを用意します。S3 ディスクの定義例を次に示します。

<clickhouse>
    <storage_configuration>
        ...
        <disks>
            <s3>
                <type>s3</type>
                <endpoint>https://sample-bucket.s3.us-east-2.amazonaws.com/tables/</endpoint>
                <access_key_id>your_access_key_id</access_key_id>
                <secret_access_key>your_secret_access_key</secret_access_key>
                <region></region>
                <metadata_path>/var/lib/clickhouse/disks/s3/</metadata_path>
            </s3>
            <s3_cache>
                <type>cache</type>
                <disk>s3</disk>
                <path>/var/lib/clickhouse/disks/s3_cache/</path>
                <max_size>10Gi</max_size>
            </s3_cache>
        </disks>
        ...
    </storage_configuration>
</clickhouse>

このディスク宣言に関連する設定の完全な一覧はこちらにあります。クレデンシャルは、Managing credentials で説明したのと同じ手法を用いてここで管理できます。たとえば、上記の設定ブロックで use_environment_credentials を true に設定することで、IAM ロールを使用できます。

ストレージポリシーの作成

一度設定すると、この「ディスク」はポリシー内で宣言されたストレージボリュームで使用できます。以下の例では、s3 が唯一のストレージであると仮定します。これは、TTL や使用率に基づいてデータを再配置できる、より複雑なホット・コールド構成は考慮していません。

<clickhouse>
    <storage_configuration>
        <disks>
            <s3>
            ...
            </s3>
            <s3_cache>
            ...
            </s3_cache>
        </disks>
        <policies>
            <s3_main>
                <volumes>
                    <main>
                        <disk>s3</disk>
                    </main>
                </volumes>
            </s3_main>
        </policies>
    </storage_configuration>
</clickhouse>

テーブルの作成

書き込み権限を持つバケットを使用するようにディスクを構成していると仮定すると、以下の例のようなテーブルを作成できるはずです。説明を簡潔にするため、NYC タクシー・データセットのカラムの一部のみを使用し、データを S3 をバックエンドとするテーブルへ直接ストリーミングします。

CREATE TABLE trips_s3
(
   `trip_id` UInt32,
   `pickup_date` Date,
   `pickup_datetime` DateTime,
   `dropoff_datetime` DateTime,
   `pickup_longitude` Float64,
   `pickup_latitude` Float64,
   `dropoff_longitude` Float64,
   `dropoff_latitude` Float64,
   `passenger_count` UInt8,
   `trip_distance` Float64,
   `tip_amount` Float32,
   `total_amount` Float32,
   `payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime
SETTINGS storage_policy='s3_main'
INSERT INTO trips_s3 SELECT trip_id, pickup_date, pickup_datetime, dropoff_datetime, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count, trip_distance, tip_amount, total_amount, payment_type FROM s3('https://ch-nyc-taxi.s3.eu-west-3.amazonaws.com/tsv/trips_{0..9}.tsv.gz', 'TabSeparatedWithNames') LIMIT 1000000;

ハードウェアによっては、この後の 100 万行の挿入の実行に数分かかる場合があります。system.processes テーブルで進行状況を確認できます。行数は最大 1,000 万行まで調整し、いくつかのサンプルクエリを試してみてください。

SELECT passenger_count, avg(tip_amount) AS avg_tip, avg(total_amount) AS avg_amount FROM trips_s3 GROUP BY passenger_count;

テーブルの変更

特定のテーブルのストレージポリシーを変更する必要が生じることがあります。これは可能ですが、いくつか制限があります。新しいターゲットポリシーには、以前のポリシーに含まれていたすべてのディスクおよびボリュームが含まれていなければなりません。つまり、ポリシー変更に合わせてデータが移動されることはありません。これらの制約を検証する際には、ボリュームとディスクは名前によって識別され、これに反する変更を行おうとするとエラーになります。ただし、前述の例を使用している場合、次の変更は許可されます。

<policies>
   <s3_main>
       <volumes>
           <main>
               <disk>s3</disk>
           </main>
       </volumes>
   </s3_main>
   <s3_tiered>
       <volumes>
           <hot>
               <disk>default</disk>
           </hot>
           <main>
               <disk>s3</disk>
           </main>
       </volumes>
       <move_factor>0.2</move_factor>
   </s3_tiered>
</policies>
ALTER TABLE trips_s3 MODIFY SETTING storage_policy='s3_tiered'

ここでは、新しい s3_tiered ポリシーで既存のメインボリュームを再利用し、新しいホットボリュームを導入します。これはデフォルトディスクを使用しており、このデフォルトディスクはパラメータ <path> で設定された 1 つのディスクのみで構成されています。ボリューム名とディスクが変わらない点に注意してください。テーブルへの新規挿入データは、move_factor * disk_size に到達するまではデフォルトディスク上に保持され、その時点でデータは S3 に移動されます。

レプリケーションの処理

S3 ディスクを用いたレプリケーションは、ReplicatedMergeTree テーブルエンジンを使用することで実現できます。詳細については、S3 Object Storage を使用して 2 つの AWS リージョン間で単一シャードをレプリケートする ガイドを参照してください。

読み取りと書き込み

以下の注意事項では、ClickHouse における S3 との連携実装について説明します。多くは参考情報レベルですが、パフォーマンス最適化 を行う際の助けとなる場合があります。

  • デフォルトでは、クエリ処理パイプラインの任意のステージで使用されるクエリ処理用スレッドの最大数は、コア数と同じになります。ステージによって並列化のしやすさが異なるため、この値が上限として機能します。ディスクからデータがストリーミングされるため、複数のクエリステージが同時に実行される場合があります。そのため、クエリで実際に使用されるスレッド数がこの値を超えることもあります。この動作は設定 max_threads で変更できます。
  • S3 上での読み取りは、デフォルトでは非同期で行われます。この挙動は remote_filesystem_read_method 設定によって決まり、デフォルト値は threadpool です。リクエストを処理する際、ClickHouse はストライプ単位でグラニュールを読み取ります。各ストライプには多くのカラムが含まれる可能性があります。1 本のスレッドが、そのグラニュールに対応するカラムを一つずつ読み取ります。これを同期的に行う代わりに、データを待つ前にすべてのカラムに対して先読み(prefetch)を行います。この方式により、各カラムを同期的に待機する場合と比べて大きな性能向上が得られます。多くのケースでは、この設定を変更する必要はありません。詳細は パフォーマンス最適化 を参照してください。
  • 書き込みは並列で実行され、最大 100 本のファイル書き込みスレッドが同時に動作します。max_insert_delayed_streams_for_parallel_write はデフォルト値 1000 で、並列に書き込まれる S3 の BLOB オブジェクトの数を制御します。書き込み中の各ファイルごとにバッファ(約 1MB)が必要になるため、これは実質的に INSERT のメモリ消費量の上限を制約します。サーバーのメモリが少ない環境では、この値を下げることが適切な場合があります。

S3 オブジェクトストレージを ClickHouse のディスクとして使用する

バケットと IAM ロールを作成するためのステップバイステップの手順が必要な場合は、Create S3 buckets and an IAM role を展開して、手順に従ってください。

S3バケットとIAMユーザーの作成

本記事では、AWS IAMユーザーの設定、S3バケットの作成、およびClickHouseでバケットをS3ディスクとして使用するための設定の基本を説明します。使用する権限についてはセキュリティチームと協議し、本記事の内容を出発点として検討してください。

AWS IAMユーザーの作成

この手順では、ログインユーザーではなく、サービスアカウントユーザーを作成します。

  1. AWS IAM管理コンソールにログインします。

  2. 「ユーザー」でユーザーを追加を選択します。

AWS IAM管理コンソール - 新しいユーザーの追加
  1. ユーザー名を入力し、認証情報タイプをアクセスキー - プログラムによるアクセスに設定して、次へ: アクセス許可を選択します。
IAMユーザーのユーザー名とアクセスタイプの設定
  1. ユーザーをどのグループにも追加せず、次へ: タグを選択します。
IAMユーザーのグループ割り当てをスキップ
  1. タグを追加する必要がない場合は、次へ: 確認を選択します。
IAMユーザーのタグ割り当てをスキップ
  1. ユーザーの作成を選択します。

    注記

    ユーザーに権限がないという警告メッセージは無視できます。次のセクションでバケットに対するユーザーの権限を付与します。

権限なしの警告を伴うIAMユーザーの作成
  1. ユーザーが作成されました。表示をクリックして、アクセスキーとシークレットキーをコピーします。
    注記

    キーを別の場所に保存してください。シークレットアクセスキーが利用可能なのはこの時のみです。

IAMユーザーアクセスキーの表示とコピー
  1. 閉じるをクリックし、ユーザー画面でユーザーを検索します。
ユーザーリストで新しく作成されたIAMユーザーを検索
  1. ARN(Amazon Resource Name)をコピーし、バケットのアクセスポリシーを設定する際に使用するために保存します。
IAMユーザーのARNをコピー

S3バケットの作成

  1. S3バケットセクションで、バケットを作成を選択します。
S3バケット作成プロセスの開始
  1. バケット名を入力し、その他のオプションはデフォルトのままにします。
    注記

    バケット名は組織内だけでなく、AWS全体で一意である必要があります。そうでない場合、エラーが発生します。

  2. パブリックアクセスをすべてブロックを有効のままにします。パブリックアクセスは不要です。
パブリックアクセスをブロックしたS3バケット設定の構成
  1. ページ下部のバケットを作成を選択します
S3バケット作成の完了
  1. リンクを選択してARNをコピーし、バケットのアクセスポリシー構成時に使用するために保存します。

  2. バケットが作成されたら、S3バケットリストから新しいS3バケットを見つけてリンクを選択します

バケットリストで新しく作成されたS3バケットを見つける
  1. フォルダを作成を選択します
S3バケット内に新しいフォルダを作成
  1. ClickHouse S3ディスクのターゲットとなるフォルダ名を入力し、フォルダを作成を選択します
ClickHouse S3ディスク使用のためのフォルダ名の設定
  1. フォルダがバケットリストに表示されます
S3バケット内の新しく作成されたフォルダの表示
  1. 新しいフォルダのチェックボックスを選択し、URLをコピーをクリックします。コピーしたURLは、次のセクションでClickHouseストレージ構成に使用するために保存します。
ClickHouse構成のためのS3フォルダURLのコピー
  1. アクセス許可タブを選択し、バケットポリシーセクションの編集ボタンをクリックします
S3バケットポリシー構成へのアクセス
  1. バケットポリシーを追加します。以下は例です:
{
  "Version": "2012-10-17",
  "Id": "Policy123456",
  "Statement": [
    {
      "Sid": "abc123",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::921234567898:user/mars-s3-user"
      },
      "Action": "s3:*",
      "Resource": ["arn:aws:s3:::mars-doc-test", "arn:aws:s3:::mars-doc-test/*"]
    }
  ]
}
|パラメータ | 説明 | 例の値 |
|----------|-------------|----------------|
|Version | ポリシーインタープリタのバージョン、そのままにする | 2012-10-17 |
|Sid | ユーザー定義のポリシーID | abc123 |
|Effect | ユーザーリクエストを許可するか拒否するか | Allow |
|Principal | 許可されるアカウントまたはユーザー | arn:aws:iam::921234567898:user/mars-s3-user |
|Action | バケットで許可される操作 | s3:*|
|Resource | バケット内で操作が許可されるリソース | "arn:aws:s3:::mars-doc-test", "arn:aws:s3:::mars-doc-test/*" |
注記

使用する権限を決定するには、セキュリティチームと協力してください。これらを出発点として検討してください。 ポリシーと設定の詳細については、AWSドキュメントを参照してください: https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-policy-language-overview.html

  1. ポリシー構成を保存します。

ClickHouse を構成して S3 バケットをディスクとして使用する

次の例は、デフォルトの ClickHouse ディレクトリを使用してサービスとしてインストールされた Linux の DEB パッケージを前提としています。

  1. ストレージ構成を保存するために、ClickHouse の config.d ディレクトリに新しいファイルを作成します。
vim /etc/clickhouse-server/config.d/storage_config.xml
  1. ストレージ設定として以下を追加し、先ほどの手順で取得したバケットパス、アクセスキー、シークレットキーに置き換えます
<clickhouse>
  <storage_configuration>
    <disks>
      <s3_disk>
        <type>s3</type>
        <endpoint>https://mars-doc-test.s3.amazonaws.com/clickhouse3/</endpoint>
        <access_key_id>ABC123</access_key_id>
        <secret_access_key>Abc+123</secret_access_key>
        <metadata_path>/var/lib/clickhouse/disks/s3_disk/</metadata_path>
      </s3_disk>
      <s3_cache>
        <type>cache</type>
        <disk>s3_disk</disk>
        <path>/var/lib/clickhouse/disks/s3_cache/</path>
        <max_size>10Gi</max_size>
      </s3_cache>
    </disks>
    <policies>
      <s3_main>
        <volumes>
          <main>
            <disk>s3_disk</disk>
          </main>
        </volumes>
      </s3_main>
    </policies>
  </storage_configuration>
</clickhouse>
注記

<disks> タグ内の s3_disk および s3_cache は任意のラベルです。これらは別の名前に設定できますが、そのディスクを参照するために、同じラベルを <policies> タグ配下の <disk> タグでも使用する必要があります。 <S3_main> タグも任意であり、ClickHouse 内でリソースを作成する際に、ストレージターゲットを識別するために使用されるポリシー名です。

上記の設定は ClickHouse バージョン 22.8 以降向けです。以前のバージョンを使用している場合は、storing data ドキュメントを参照してください。

S3 の利用に関する詳細については、次を参照してください: Integrations Guide: S3 Backed MergeTree

  1. ファイルの所有者を clickhouse ユーザーおよびグループに変更します
chown clickhouse:clickhouse /etc/clickhouse-server/config.d/storage_config.xml
  1. 変更を有効にするために ClickHouse インスタンスを再起動します。
service clickhouse-server restart

テスト

  1. 次のようなコマンドを使用して ClickHouse クライアントにログインします
clickhouse-client --user default --password ClickHouse123!
  1. 新しい S3 ストレージポリシーを指定したテーブルを作成する
CREATE TABLE s3_table1
           (
               `id` UInt64,
               `column1` String
           )
           ENGINE = MergeTree
           ORDER BY id
           SETTINGS storage_policy = 's3_main';
  1. テーブルが適切なポリシーで作成されていることを確認します
SHOW CREATE TABLE s3_table1;
┌─statement────────────────────────────────────────────────────
│ CREATE TABLE default.s3_table1
(
    `id` UInt64,
    `column1` String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS storage_policy = 's3_main', index_granularity = 8192
└──────────────────────────────────────────────────────────────
  1. テーブルにテスト用の行を挿入する
INSERT INTO s3_table1
           (id, column1)
           VALUES
           (1, 'abc'),
           (2, 'xyz');
INSERT INTO s3_table1 (id, column1) FORMAT Values

Query id: 0265dd92-3890-4d56-9d12-71d4038b85d5

Ok.

2 rows in set. Elapsed: 0.337 sec.
  1. 行を確認する
SELECT * FROM s3_table1;
┌─id─┬─column1─┐
│  1 │ abc     │
│  2 │ xyz     │
└────┴─────────┘

2 行が結果に含まれます。経過時間: 0.284 秒。

6.  AWSコンソールで、バケットに移動し、新しく作成したバケットとフォルダを選択します。
以下のような表示が確認できます。

<Image img={S3J} size="lg" border alt="S3に保存されたClickHouseデータファイルを表示するAWSコンソールのS3バケットビュー" />

S3 オブジェクトストレージを使用して単一シャードを 2 つの AWS リージョン間でレプリケートする

ヒント

ClickHouse Cloud ではデフォルトでオブジェクトストレージが使用されるため、ClickHouse Cloud 上で実行している場合はこの手順を実施する必要はありません。

デプロイを計画する

このチュートリアルは、AWS EC2 上に 2 つの ClickHouse サーバーノードと 3 つの ClickHouse Keeper ノードをデプロイすることを前提としています。ClickHouse サーバーのデータストアには S3 を使用します。ディザスタリカバリをサポートするために、2 つの AWS リージョンを使用し、それぞれのリージョンに 1 つずつ ClickHouse サーバーと S3 バケットを配置します。

ClickHouse テーブルは 2 台のサーバー間でレプリケートされるため、2 つのリージョン間でもレプリケートされます。

ソフトウェアをインストールする

ClickHouse サーバーノード

ClickHouse サーバーノードでデプロイ手順を実行する際は、インストール手順 を参照してください。

ClickHouse をデプロイする

2 つのホストに ClickHouse をデプロイします。サンプル構成では、これらは chnode1chnode2 という名前になっています。

chnode1 を 1 つ目の AWS リージョンに、chnode2 を 2 つ目のリージョンに配置します。

ClickHouse Keeper をデプロイする

3 つのホストに ClickHouse Keeper をデプロイします。サンプル構成では、これらは keepernode1keepernode2keepernode3 という名前になっています。keepernode1chnode1 と同じリージョンに、keepernode2chnode2 と同じリージョンに、keepernode3 はいずれかのリージョン内で、そのリージョンの ClickHouse ノードとは別のアベイラビリティーゾーンにデプロイします。

ClickHouse Keeper ノードでデプロイ手順を実行する際は、インストール手順 を参照してください。

S3 バケットを作成する

chnode1chnode2 を配置したそれぞれのリージョンに、S3 バケットを 1 つずつ、合計 2 つ作成します。

バケットと IAM ロールの作成についてステップごとの手順が必要な場合は、S3 バケットと IAM ロールの作成 を展開して、手順に従ってください。

S3バケットとIAMユーザーの作成

本記事では、AWS IAMユーザーの設定、S3バケットの作成、およびClickHouseでバケットをS3ディスクとして使用するための設定の基本を説明します。使用する権限についてはセキュリティチームと協議し、本記事の内容を出発点として検討してください。

AWS IAMユーザーの作成

この手順では、ログインユーザーではなく、サービスアカウントユーザーを作成します。

  1. AWS IAM管理コンソールにログインします。

  2. 「ユーザー」でユーザーを追加を選択します。

AWS IAM管理コンソール - 新しいユーザーの追加
  1. ユーザー名を入力し、認証情報タイプをアクセスキー - プログラムによるアクセスに設定して、次へ: アクセス許可を選択します。
IAMユーザーのユーザー名とアクセスタイプの設定
  1. ユーザーをどのグループにも追加せず、次へ: タグを選択します。
IAMユーザーのグループ割り当てをスキップ
  1. タグを追加する必要がない場合は、次へ: 確認を選択します。
IAMユーザーのタグ割り当てをスキップ
  1. ユーザーの作成を選択します。

    注記

    ユーザーに権限がないという警告メッセージは無視できます。次のセクションでバケットに対するユーザーの権限を付与します。

権限なしの警告を伴うIAMユーザーの作成
  1. ユーザーが作成されました。表示をクリックして、アクセスキーとシークレットキーをコピーします。
    注記

    キーを別の場所に保存してください。シークレットアクセスキーが利用可能なのはこの時のみです。

IAMユーザーアクセスキーの表示とコピー
  1. 閉じるをクリックし、ユーザー画面でユーザーを検索します。
ユーザーリストで新しく作成されたIAMユーザーを検索
  1. ARN(Amazon Resource Name)をコピーし、バケットのアクセスポリシーを設定する際に使用するために保存します。
IAMユーザーのARNをコピー

S3バケットの作成

  1. S3バケットセクションで、バケットを作成を選択します。
S3バケット作成プロセスの開始
  1. バケット名を入力し、その他のオプションはデフォルトのままにします。
    注記

    バケット名は組織内だけでなく、AWS全体で一意である必要があります。そうでない場合、エラーが発生します。

  2. パブリックアクセスをすべてブロックを有効のままにします。パブリックアクセスは不要です。
パブリックアクセスをブロックしたS3バケット設定の構成
  1. ページ下部のバケットを作成を選択します
S3バケット作成の完了
  1. リンクを選択してARNをコピーし、バケットのアクセスポリシー構成時に使用するために保存します。

  2. バケットが作成されたら、S3バケットリストから新しいS3バケットを見つけてリンクを選択します

バケットリストで新しく作成されたS3バケットを見つける
  1. フォルダを作成を選択します
S3バケット内に新しいフォルダを作成
  1. ClickHouse S3ディスクのターゲットとなるフォルダ名を入力し、フォルダを作成を選択します
ClickHouse S3ディスク使用のためのフォルダ名の設定
  1. フォルダがバケットリストに表示されます
S3バケット内の新しく作成されたフォルダの表示
  1. 新しいフォルダのチェックボックスを選択し、URLをコピーをクリックします。コピーしたURLは、次のセクションでClickHouseストレージ構成に使用するために保存します。
ClickHouse構成のためのS3フォルダURLのコピー
  1. アクセス許可タブを選択し、バケットポリシーセクションの編集ボタンをクリックします
S3バケットポリシー構成へのアクセス
  1. バケットポリシーを追加します。以下は例です:
{
  "Version": "2012-10-17",
  "Id": "Policy123456",
  "Statement": [
    {
      "Sid": "abc123",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::921234567898:user/mars-s3-user"
      },
      "Action": "s3:*",
      "Resource": ["arn:aws:s3:::mars-doc-test", "arn:aws:s3:::mars-doc-test/*"]
    }
  ]
}
|パラメータ | 説明 | 例の値 |
|----------|-------------|----------------|
|Version | ポリシーインタープリタのバージョン、そのままにする | 2012-10-17 |
|Sid | ユーザー定義のポリシーID | abc123 |
|Effect | ユーザーリクエストを許可するか拒否するか | Allow |
|Principal | 許可されるアカウントまたはユーザー | arn:aws:iam::921234567898:user/mars-s3-user |
|Action | バケットで許可される操作 | s3:*|
|Resource | バケット内で操作が許可されるリソース | "arn:aws:s3:::mars-doc-test", "arn:aws:s3:::mars-doc-test/*" |
注記

使用する権限を決定するには、セキュリティチームと協力してください。これらを出発点として検討してください。 ポリシーと設定の詳細については、AWSドキュメントを参照してください: https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-policy-language-overview.html

  1. ポリシー構成を保存します。

その後、設定ファイルは /etc/clickhouse-server/config.d/ に配置します。以下は 1 つのバケットに対するサンプル設定ファイルであり、もう一方も同様ですが、ハイライトされている 3 行のみが異なります。

<clickhouse>
  <storage_configuration>
     <disks>
        <s3_disk>
           <type>s3</type>
        <!--highlight-start-->
           <endpoint>https://docs-clickhouse-s3.s3.us-east-2.amazonaws.com/clickhouses3/</endpoint>
           <access_key_id>ABCDEFGHIJKLMNOPQRST</access_key_id>
           <secret_access_key>Tjdm4kf5snfkj303nfljnev79wkjn2l3knr81007</secret_access_key>
        <!--highlight-end-->
           <metadata_path>/var/lib/clickhouse/disks/s3_disk/</metadata_path>
        </s3_disk>

        <s3_cache>
           <type>cache</type>
           <disk>s3_disk</disk>
           <path>/var/lib/clickhouse/disks/s3_cache/</path>
           <max_size>10Gi</max_size>
        </s3_cache>
     </disks>
        <policies>
            <s3_main>
                <volumes>
                    <main>
                        <disk>s3_disk</disk>
                    </main>
                </volumes>
            </s3_main>
    </policies>
   </storage_configuration>
</clickhouse>
注記

このガイドの多くの手順では、設定ファイルを /etc/clickhouse-server/config.d/ に配置するよう指示されます。これは、Linux システムにおける設定上書き用ファイルのデフォルトの場所です。これらのファイルをそのディレクトリに配置すると、ClickHouse はその内容を使用してデフォルト設定を上書きします。これらのファイルをオーバーライド用ディレクトリに配置しておくことで、アップグレード時に設定が失われるのを防ぐことができます。

ClickHouse Keeper を設定する

ClickHouse Keeper を単体で(ClickHouse サーバーとは別に)実行する場合、設定は 1 つの XML ファイルで構成されます。このチュートリアルでは、そのファイルは /etc/clickhouse-keeper/keeper_config.xml です。3 台すべての Keeper サーバーは同じ設定を使用しますが、1 か所だけ異なる設定があります。それが <server_id> です。

server_id は、その設定ファイルを使用するホストに割り当てられる ID を示します。以下の例では、server_id3 であり、ファイル内のさらに下にある <raft_configuration> セクションを見ると、サーバー 3 のホスト名が keepernode3 となっていることがわかります。ClickHouse Keeper プロセスは、この情報を基に、リーダーの選出やその他の処理全般を行う際に、どのサーバーへ接続すべきかを認識します。

<clickhouse>
    <logger>
        <level>trace</level>
        <log>/var/log/clickhouse-keeper/clickhouse-keeper.log</log>
        <errorlog>/var/log/clickhouse-keeper/clickhouse-keeper.err.log</errorlog>
        <size>1000M</size>
        <count>3</count>
    </logger>
    <listen_host>0.0.0.0</listen_host>
    <keeper_server>
        <tcp_port>9181</tcp_port>
<!--highlight-next-line-->
        <server_id>3</server_id>
        <log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
        <snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>

        <coordination_settings>
            <operation_timeout_ms>10000</operation_timeout_ms>
            <session_timeout_ms>30000</session_timeout_ms>
            <raft_logs_level>warning</raft_logs_level>
        </coordination_settings>

        <raft_configuration>
            <server>
                <id>1</id>
                <hostname>keepernode1</hostname>
                <port>9234</port>
            </server>
            <server>
                <id>2</id>
                <hostname>keepernode2</hostname>
                <port>9234</port>
            </server>
<!--highlight-start-->
            <server>
                <id>3</id>
                <hostname>keepernode3</hostname>
                <port>9234</port>
            </server>
<!--highlight-end-->
        </raft_configuration>
    </keeper_server>
</clickhouse>

ClickHouse Keeper の設定ファイルを所定の場所にコピーします(<server_id> を必ず設定してください):

sudo -u clickhouse \
  cp keeper.xml /etc/clickhouse-keeper/keeper.xml

ClickHouse サーバーの設定

クラスターの定義

ClickHouse のクラスターは設定の <remote_servers> セクション内で定義します。この例では cluster_1S_2R という 1 つのクラスターが定義されており、単一のシャードと 2 つのレプリカで構成されています。レプリカはそれぞれ chnode1chnode2 のホスト上に配置されています。

<clickhouse>
    <remote_servers replace="true">
        <cluster_1S_2R>
            <shard>
                <replica>
                    <host>chnode1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>chnode2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </cluster_1S_2R>
    </remote_servers>
</clickhouse>

クラスタを扱う場合、クラスタ、シャード、レプリカの設定を DDL クエリに埋め込むマクロを定義しておくと便利です。このサンプルでは、shardreplica を個別に指定しなくても、レプリケーション対応テーブルエンジンを利用できるようにしています。テーブルを作成したら、system.tables をクエリすることで、shardreplica のマクロがどのように展開されているかを確認できます。

<clickhouse>
    <distributed_ddl>
            <path>/clickhouse/task_queue/ddl</path>
    </distributed_ddl>
    <macros>
        <cluster>cluster_1S_2R</cluster>
        <shard>1</shard>
        <replica>replica_1</replica>
    </macros>
</clickhouse>
注記

上記のマクロは chnode1 向けです。chnode2 では replicareplica_2 に設定してください。

ゼロコピー レプリケーションを無効化する

ClickHouse バージョン 22.7 以前では、allow_remote_fs_zero_copy_replication 設定は S3 および HDFS ディスクに対してデフォルトで true に設定されています。このディザスタリカバリのシナリオでは、この設定を false にする必要があり、バージョン 22.8 以降ではデフォルトで false に設定されています。

この設定は次の 2 つの理由から false にする必要があります。1) この機能はまだ本番環境での利用に十分な成熟度に達していないこと、2) ディザスタリカバリシナリオでは、データとメタデータの両方を複数のリージョンに保存する必要があることです。allow_remote_fs_zero_copy_replicationfalse に設定してください。

<clickhouse>
   <merge_tree>
        <allow_remote_fs_zero_copy_replication>false</allow_remote_fs_zero_copy_replication>
   </merge_tree>
</clickhouse>

ClickHouse Keeper は、ClickHouse ノード間でのデータレプリケーションを調整する役割を担います。ClickHouse に ClickHouse Keeper ノードを認識させるには、各 ClickHouse ノードに設定ファイルを追加します。

<clickhouse>
    <zookeeper>
        <node index="1">
            <host>keepernode1</host>
            <port>9181</port>
        </node>
        <node index="2">
            <host>keepernode2</host>
            <port>9181</port>
        </node>
        <node index="3">
            <host>keepernode3</host>
            <port>9181</port>
        </node>
    </zookeeper>
</clickhouse>

ネットワークの設定

サーバー同士、また利用者がサーバーと通信できるようにするために、AWS のセキュリティ設定を行う際は、ネットワークポートの一覧を参照してください。

3 台すべてのサーバーが、サーバー間および S3 との通信を行えるように、ネットワーク接続を待ち受けるよう構成する必要があります。デフォルトでは ClickHouse はループバックアドレスでのみ待ち受けるため、これを変更する必要があります。これは /etc/clickhouse-server/config.d/ で設定します。以下は、ClickHouse および ClickHouse Keeper がすべての IPv4 インターフェイスで待ち受けるように構成するサンプルです。詳細については、ドキュメントまたはデフォルトの設定ファイル /etc/clickhouse/config.xml を参照してください。

<clickhouse>
    <listen_host>0.0.0.0</listen_host>
</clickhouse>

サーバーを起動する

ClickHouse Keeper を起動する

各 Keeper サーバーで、使用しているオペレーティングシステムに応じたコマンドを実行します。例えば:

sudo systemctl enable clickhouse-keeper
sudo systemctl start clickhouse-keeper
sudo systemctl status clickhouse-keeper

ClickHouse Keeper の状態を確認する

netcat を使って ClickHouse Keeper にコマンドを送信します。たとえば、mntr は ClickHouse Keeper クラスターの状態を返します。各 Keeper ノードでこのコマンドを実行すると、1 つがリーダーで、残り 2 つがフォロワーであることを確認できます。

echo mntr | nc localhost 9181
zk_version      v22.7.2.15-stable-f843089624e8dd3ff7927b8a125cf3a7a769c069
zk_avg_latency  0
zk_max_latency  11
zk_min_latency  0
zk_packets_received     1783
zk_packets_sent 1783
# highlight-start \{#highlight-start}
zk_num_alive_connections        2
zk_outstanding_requests 0
zk_server_state leader
# highlight-end \{#highlight-end}
zk_znode_count  135
zk_watch_count  8
zk_ephemerals_count     3
zk_approximate_data_size        42533
zk_key_arena_size       28672
zk_latest_snapshot_size 0
zk_open_file_descriptor_count   182
zk_max_file_descriptor_count    18446744073709551615
# highlight-start \{#highlight-start}
zk_followers    2
zk_synced_followers     2
# highlight-end \{#highlight-end}

ClickHouse サーバーを起動する

各 ClickHouse サーバーで次を実行します。

sudo service clickhouse-server start

ClickHouse サーバーを検証する

クラスタ設定を追加したとき、2 つの ClickHouse ノード間でレプリケートされる 1 つのシャードが定義されました。この検証ステップでは、ClickHouse の起動時にクラスタが構築されていることを確認し、そのクラスタを使用してレプリケートされたテーブルを作成します。

  • クラスタが存在することを確認します:

    show clusters
    
    ┌─cluster───────┐
    │ cluster_1S_2R │
    └───────────────┘
    
    1 row in set. Elapsed: 0.009 sec. `
    
  • ReplicatedMergeTree テーブルエンジンを使用してクラスタ内にテーブルを作成します:

    create table trips on cluster 'cluster_1S_2R' (
     `trip_id` UInt32,
     `pickup_date` Date,
     `pickup_datetime` DateTime,
     `dropoff_datetime` DateTime,
     `pickup_longitude` Float64,
     `pickup_latitude` Float64,
     `dropoff_longitude` Float64,
     `dropoff_latitude` Float64,
     `passenger_count` UInt8,
     `trip_distance` Float64,
     `tip_amount` Float32,
     `total_amount` Float32,
     `payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4))
    ENGINE = ReplicatedMergeTree
    PARTITION BY toYYYYMM(pickup_date)
    ORDER BY pickup_datetime
    SETTINGS storage_policy='s3_main'
    
    ┌─host────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
    │ chnode1 │ 9000 │      0 │       │                   1 │                0 │
    │ chnode2 │ 9000 │      0 │       │                   0 │                0 │
    └─────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
    
  • 先ほど定義したマクロの使われ方を確認する

    マクロ shardreplica前に定義しました。下のハイライトされた行では、各 ClickHouse ノードでそれぞれの値がどのように置き換えられているかを確認できます。さらに uuid という値も使用されています。uuid はシステムによって生成されるため、マクロ内では定義されていません。

    SELECT create_table_query
    FROM system.tables
    WHERE name = 'trips'
    FORMAT Vertical
    
    Query id: 4d326b66-0402-4c14-9c2f-212bedd282c0
    
    1 行目:
    ──────
    create_table_query: CREATE TABLE default.trips (`trip_id` UInt32, `pickup_date` Date, `pickup_datetime` DateTime, `dropoff_datetime` DateTime, `pickup_longitude` Float64, `pickup_latitude` Float64, `dropoff_longitude` Float64, `dropoff_latitude` Float64, `passenger_count` UInt8, `trip_distance` Float64, `tip_amount` Float32, `total_amount` Float32, `payment_type` Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4))
    # highlight-next-line
    ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}')
    PARTITION BY toYYYYMM(pickup_date) ORDER BY pickup_datetime SETTINGS storage_policy = 's3_main'
    
    1 行がセットに含まれています。経過時間: 0.012 秒。
    
注記

上記のzookeeperパス 'clickhouse/tables/{uuid}/{shard} は、default_replica_pathdefault_replica_name を設定することでカスタマイズできます。ドキュメントはこちらをご参照ください。

テスト

以下のテストでは、データが2つのサーバー間でレプリケーションされていること、およびローカルディスクではなくS3バケットに保存されていることを検証します。

  • New York Cityタクシーデータセットからデータを追加します:
INSERT INTO trips
SELECT trip_id,
       pickup_date,
       pickup_datetime,
       dropoff_datetime,
       pickup_longitude,
       pickup_latitude,
       dropoff_longitude,
       dropoff_latitude,
       passenger_count,
       trip_distance,
       tip_amount,
       total_amount,
       payment_type
   FROM s3('https://ch-nyc-taxi.s3.eu-west-3.amazonaws.com/tsv/trips_{0..9}.tsv.gz', 'TabSeparatedWithNames') LIMIT 1000000;
  • データが S3 に保存されていることを確認します。

    このクエリは、ディスク上のデータサイズと、どのディスクが使用されるかを決定するために使用されているストレージポリシーを表示します。

    SELECT
        engine,
        data_paths,
        metadata_path,
        storage_policy,
        formatReadableSize(total_bytes)
    FROM system.tables
    WHERE name = 'trips'
    FORMAT Vertical
    
    Query id: af7a3d1b-7730-49e0-9314-cc51c4cf053c
    
    Row 1:
    ──────
    engine:                          ReplicatedMergeTree
    data_paths:                      ['/var/lib/clickhouse/disks/s3_disk/store/551/551a859d-ec2d-4512-9554-3a4e60782853/']
    metadata_path:                   /var/lib/clickhouse/store/e18/e18d3538-4c43-43d9-b083-4d8e0f390cf7/trips.sql
    storage_policy:                  s3_main
    formatReadableSize(total_bytes): 36.42 MiB
    
    1 row in set. Elapsed: 0.009 sec.
    

    ローカルディスク上のデータサイズを確認します。上の結果から、保存されている数百万行のディスク上のサイズは 36.42 MiB です。これはローカルディスクではなく S3 上にあるはずです。上記のクエリからは、ローカルディスク上のデータとメタデータの保存場所も分かります。ローカル側のデータを確認します:

    root@chnode1:~# du -sh /var/lib/clickhouse/disks/s3_disk/store/551
    536K  /var/lib/clickhouse/disks/s3_disk/store/551
    

    各 S3 バケット内のデータを確認します(合計は表示されていませんが、両方のバケットにはインサート後に約 36 MiB が保存されています):

最初の S3 バケット内のデータサイズ(ストレージ使用量メトリクスを表示)
2 番目の S3 バケット内のデータサイズ(ストレージ使用量メトリクスを表示)

S3Express

S3Express は、Amazon S3 における新しい高性能な単一アベイラビリティーゾーン向けストレージクラスです。

S3Express を ClickHouse でテストした際の経験については、この ブログ記事 を参照してください。

注記

S3Express はデータを単一の AZ 内に保存します。これは、AZ 障害時にはデータにアクセスできなくなることを意味します。

S3 disk

S3Express バケット上のストレージをバックエンドとするテーブルを作成するには、次の手順を実行します。

  1. Directory タイプのバケットを作成する
  2. S3 ユーザーに必要なすべての権限を付与するため、適切なバケットポリシーを設定する(例: 無制限アクセスを許可するために "Action": "s3express:*" を設定)
  3. ストレージポリシーを設定する際に、region パラメータを指定する

ストレージ構成は通常の S3 と同じであり、たとえば次のようになります。

<storage_configuration>
    <disks>
        <s3_express>
            <type>s3</type>
            <endpoint>https://my-test-bucket--eun1-az1--x-s3.s3express-eun1-az1.eu-north-1.amazonaws.com/store/</endpoint>
            <region>eu-north-1</region>
            <access_key_id>...</access_key_id>
            <secret_access_key>...</secret_access_key>
        </s3_express>
    </disks>
    <policies>
        <s3_express>
            <volumes>
                <main>
                    <disk>s3_express</disk>
                </main>
            </volumes>
        </s3_express>
    </policies>
</storage_configuration>

次に、新しいストレージ上にテーブルを作成します。

CREATE TABLE t
(
    a UInt64,
    s String
)
ENGINE = MergeTree
ORDER BY a
SETTINGS storage_policy = 's3_express';

S3 ストレージ

S3 ストレージもサポートされていますが、Object URL 形式のパスに対してのみ利用できます。例:

SELECT * FROM s3('https://test-bucket--eun1-az1--x-s3.s3express-eun1-az1.eu-north-1.amazonaws.com/file.csv', ...)

また、設定でバケットのリージョンを指定する必要もあります:

<s3>
    <perf-bucket-url>
        <endpoint>https://test-bucket--eun1-az1--x-s3.s3express-eun1-az1.eu-north-1.amazonaws.com</endpoint>
        <region>eu-north-1</region>
    </perf-bucket-url>
</s3>

バックアップ

上で作成したディスクにバックアップを保存できます。

BACKUP TABLE t TO Disk('s3_express', 't.zip')

┌─id───────────────────────────────────┬─status─────────┐
│ c61f65ac-0d76-4390-8317-504a30ba7595 │ BACKUP_CREATED │
└──────────────────────────────────────┴────────────────┘
RESTORE TABLE t AS t_restored FROM Disk('s3_express', 't.zip')

┌─id───────────────────────────────────┬─status───┐
│ 4870e829-8d76-4171-ae59-cffaf58dea04 │ RESTORED │
└──────────────────────────────────────┴──────────┘