メインコンテンツへスキップ
メインコンテンツへスキップ

Distributed テーブルエンジン

ClickHouse Cloud における Distributed エンジン

ClickHouse Cloud で Distributed テーブルエンジンを作成するには、remote および remoteSecure テーブル関数を使用します。 Distributed(...) 構文は ClickHouse Cloud では使用できません。

Distributed エンジンを持つテーブル自体はデータを一切保存しませんが、複数のサーバーでの分散クエリ処理を可能にします。 読み取り処理は自動的に並列化されます。読み取り時には、リモートサーバー上にテーブルインデックスが存在する場合、それらが利用されます。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

テーブルから

Distributed テーブルが現在のサーバー上のテーブルを参照している場合、そのテーブルのスキーマを利用できます。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Distributed パラメータ

ParameterDescription
clusterサーバーの設定ファイル内のクラスター名
databaseリモートデータベース名
tableリモートテーブル名
sharding_key (Optional)シャーディングキー。
sharding_key の指定が必要となるケースは次のとおりです。
  • Distributed テーブルへの INSERT の場合(テーブルエンジンがデータの分割方法を決定するために sharding_key を必要とするため)。ただし、insert_distributed_one_random_shard 設定が有効な場合は、INSERT にシャーディングキーは不要です。
  • optimize_skip_unused_shards を利用する場合(どのシャードをクエリするかを決定するために sharding_key が必要です)。
policy_name (Optional)ポリシー名。バックグラウンド送信処理で使用する一時ファイルを保存するために使用されます

関連項目

Distributed 設定

SettingDescriptionDefault value
fsync_after_insertDistributed へのバックグラウンド挿入後にファイルデータに対して fsync を実行します。OS が イニシエーターノード のディスク上の挿入済みデータ全体をファイルにフラッシュしたことを保証します。false
fsync_directoriesディレクトリに対して fsync を実行します。Distributed テーブルでのバックグラウンド挿入に関連する操作(挿入後、シャードへのデータ送信後など)の後で、OS がディレクトリメタデータを更新したことを保証します。false
skip_unavailable_shardstrue の場合、ClickHouse は利用不能なシャードを黙って自動的にスキップします。シャードは次の場合に利用不能とマークされます: 1) 接続障害によりシャードに到達できない場合。2) シャードが DNS で解決できない場合。3) テーブルがそのシャード上に存在しない場合。false
bytes_to_throw_insertバックグラウンド INSERT のために保留中の圧縮データ量(バイト数)がこの値を超えた場合、例外がスローされます。0 の場合はスローしません。0
bytes_to_delay_insertバックグラウンド INSERT のために保留中の圧縮データ量(バイト数)がこの値を超えた場合、クエリは遅延されます。0 の場合は遅延しません。0
max_delay_to_insertバックグラウンド送信のために保留中のバイト数が多い場合に、Distributed テーブルへのデータ挿入を遅延させる最大秒数。60
background_insert_batchdistributed_background_insert_batch と同じです。0
background_insert_split_batch_on_failuredistributed_background_insert_split_batch_on_failure と同じです。0
background_insert_sleep_time_msdistributed_background_insert_sleep_time_ms と同じです。0
background_insert_max_sleep_time_msdistributed_background_insert_max_sleep_time_ms と同じです。0
flush_on_detachDETACH/DROP/サーバーシャットダウン時に、リモートノードへデータをフラッシュします。true
注記

耐久性設定 (fsync_...):

  • データがまずイニシエーターノードのディスクに保存され、その後バックグラウンドでシャードへ送信される、バックグラウンド INSERT(つまり distributed_foreground_insert=false)にのみ影響します。
  • INSERT のパフォーマンスを大きく低下させる可能性があります。
  • 分散テーブルフォルダ内に保存されているデータを、挿入を受け付けたノード に書き込む処理に影響します。基盤となる MergeTree テーブルへの書き込み保証が必要な場合は、system.merge_tree_settings 内の耐久性設定(...fsync...)を参照してください。

挿入制限設定..._insert)については、次も参照してください:

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;

logs クラスター内のすべてのサーバーに存在する default.hits テーブルからデータが読み出されます。データは読み出されるだけでなく、可能な範囲でリモートサーバー側で部分的に処理されます。例えば、GROUP BY を含むクエリの場合、データはリモートサーバー上で集約され、集約関数の中間状態がリクエスト元のサーバーに送信されます。その後、そのサーバーでデータがさらに集約されます。

データベース名の代わりに、文字列を返す定数式を使用できます。例えば、currentDatabase() です。

クラスター

クラスターはサーバー設定ファイルで構成されます。

<remote_servers>
    <logs>
        <!-- 分散クエリ用の、クラスタごとのサーバ間シークレット。
             既定値: シークレットなし (認証は行われません)。

             設定した場合、分散クエリはシャード側で検証されるため、少なくとも次を満たす必要があります:
             - 対応するクラスタがシャード上に存在していること
             - そのクラスタが同じシークレットを持っていること

             さらに (より重要な点として)、initial_user が
             クエリの現在のユーザーとして使用されます。
        -->
        <!-- <secret></secret> -->
        
        <!-- 任意。このクラスタで分散DDLクエリ (ON CLUSTER 句) を許可するかどうか。既定値: true (許可)。 -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- 任意。データ書き込み時のシャードの重み。既定値: 1。 -->
            <weight>1</weight>
            <!-- 任意。シャード名。空ではなく、かつクラスタ内の他のシャードと重複しない必要があります。指定しない場合は空になります。 -->
            <name>shard_01</name>
            <!-- 任意。データを1つのレプリカのみに書き込むかどうか。既定値: false (すべてのレプリカに書き込む)。 -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- 任意。負荷分散時のレプリカの優先度 (load_balancing 設定も参照)。既定値: 1 (値が小さいほど優先度が高い)。 -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

ここでは、logs という名前のクラスタが定義されており、2 つのシャード(各シャードには 2 つのレプリカを含む)で構成されています。シャードとは、データの異なる部分を保持しているサーバーのことであり(すべてのデータを読み取るには、すべてのシャードにアクセスする必要があります)、レプリカはサーバーの複製です(すべてのデータを読み取るには、いずれか 1 つのレプリカ上のデータにアクセスすれば十分です)。

クラスタ名にはドットを含めてはいけません。

各サーバーには、hostport、および必要に応じて userpasswordsecurecompressionbind_host のパラメータを指定します。

ParameterDescriptionDefault Value
hostリモートサーバーのアドレス。ドメイン名、IPv4 アドレス、または IPv6 アドレスを使用できます。ドメイン名を指定した場合、サーバー起動時に DNS リクエストが実行され、その結果はサーバーが稼働している間保持されます。DNS リクエストが失敗すると、サーバーは起動しません。DNS レコードを変更した場合は、サーバーを再起動してください。-
portメッセージ送受信用に使用される TCP ポート(設定ファイル内の tcp_port、通常は 9000 に設定)。http_port と混同しないでください。-
userリモートサーバーへ接続するためのユーザー名。このユーザーは指定したサーバーへの接続権限を持っている必要があります。アクセス権限は users.xml ファイルで設定します。詳細は Access rights セクションを参照してください。default
passwordリモートサーバーへ接続するためのパスワード(マスクされません)。''
secureセキュアな SSL/TLS 接続を使用するかどうか。通常、ポートの指定も必要です(デフォルトのセキュアポートは 9440)。サーバーは <tcp_port_secure>9440</tcp_port_secure> でリッスンし、正しい証明書が設定されている必要があります。false
compressionデータ圧縮を使用するかどうか。true
bind_hostこのノードからリモートサーバーへ接続する際に使用する送信元アドレス。IPv4 アドレスのみサポートされます。ClickHouse の分散クエリで使用される送信元 IP アドレスを指定する必要がある、高度なデプロイメントのユースケース向けです。-

レプリカを指定すると、読み取り時に各シャードに対して利用可能なレプリカのうち 1 つが選択されます。ロードバランシングアルゴリズム(どのレプリカへアクセスするかの優先度)は、load_balancing 設定で構成できます。サーバーとの接続が確立できない場合、短いタイムアウトで接続を試行します。接続に失敗した場合は次のレプリカが選択され、すべてのレプリカについて同様に繰り返されます。すべてのレプリカへの接続試行が失敗した場合、同じ方法で複数回リトライされます。これはレジリエンス向上には有効ですが、完全なフォールトトレランスを提供するものではありません。リモートサーバーが接続を受け付けても、正常に動作しない、または性能が不十分な場合があるためです。

シャードを 1 つだけ指定することもできます(この場合、クエリ処理は「分散」ではなく「リモート」と呼ぶべきです)し、任意の数のシャードを指定することもできます。各シャード内では、1 つから任意の数のレプリカを指定できます。シャードごとに異なる数のレプリカを指定することも可能です。

設定内には、必要な数だけクラスターを指定できます。

クラスターを確認するには、system.clusters テーブルを使用します。

Distributed エンジンを使用すると、クラスターをローカルサーバーのように扱うことができます。ただし、クラスターの設定は動的に指定することはできず、サーバーの設定ファイルで構成しておく必要があります。通常、クラスター内のすべてのサーバーは同一のクラスター設定を持ちます(必須ではありません)。設定ファイル内のクラスターは、サーバーを再起動することなくオンザフライで更新されます。

毎回未知のシャードやレプリカの集合に対してクエリを送信する必要がある場合、Distributed テーブルを作成する必要はありません。その代わりに remote テーブル関数を使用してください。詳細は Table functions セクションを参照してください。

データの書き込み

クラスターにデータを書き込む方法は 2 つあります。

1 つ目は、どのサーバーにどのデータを書き込むかを自分で定義し、各シャードに直接書き込む方法です。言い換えると、Distributed テーブルが参照しているクラスター内のリモートテーブルに対して、直接 INSERT 文を実行します。これは、任意のシャーディング方式を使用できるため、対象分野の要件により複雑な方式であっても対応できる、最も柔軟な方法です。また、この方式では異なるシャードに完全に独立してデータを書き込めるため、最も効率的でもあります。

2 つ目は、Distributed テーブルに対して INSERT 文を実行する方法です。この場合、テーブル自体が挿入されたデータをサーバー間に分散します。Distributed テーブルに書き込むには、sharding_key パラメータが設定されている必要があります(シャードが 1 つしかない場合を除く)。

各シャードには、設定ファイル内で <weight> を定義できます。デフォルトでは weight は 1 です。データは、シャードの weight に比例した量でシャード間に分散されます。すべてのシャードの weight が合計され、その後、各シャードの weight を合計値で割ることで、各シャードの比率が決まります。例えば、2 つのシャードがあり、1 つ目の weight が 1、2 つ目の weight が 2 の場合、1 つ目のシャードには挿入された行の 3 分の 1 (1 / 3)、2 つ目のシャードには 3 分の 2 (2 / 3) が送られます。

各シャードには、設定ファイル内で internal_replication パラメータを定義できます。このパラメータが true に設定されている場合、書き込み処理は最初の正常なレプリカを選択し、そのレプリカにデータを書き込みます。これは、Distributed テーブルの背後にあるテーブルがレプリケートされたテーブル(例: 任意の Replicated*MergeTree テーブルエンジン)である場合に使用します。テーブルレプリカのうち 1 つが書き込みを受け取り、その後自動的に他のレプリカへレプリケートされます。

internal_replicationfalse(デフォルト)に設定されている場合、データはすべてのレプリカに書き込まれます。この場合、Distributed テーブル自体がデータを複製します。これは、レプリケートされたテーブルを使用する場合よりも劣ります。というのも、レプリカ間の一貫性が検査されず、時間の経過とともに、レプリカごとにわずかに異なるデータを保持するようになるためです。

どのシャードに行データを送るかを選択するために、シャーディング式が評価され、その結果をシャードの総 weight で割った余りが取られます。行は、余りが prev_weights から prev_weights + weight までの半開区間に対応するシャードに送られます。ここで、prev_weights は番号がより小さいシャードの総 weight、weight はそのシャード自身の weight です。例えば、2 つのシャードがあり、1 つ目の weight が 9、2 つ目の weight が 10 の場合、余りが範囲 [0, 9) に入る行は 1 つ目のシャードに、範囲 [9, 19) に入る行は 2 つ目のシャードに送られます。

シャーディング式は、定数やテーブル列からなる任意の式であり、整数を返す必要があります。例えば、データをランダムに分散するには rand() を使用できますし、ユーザー ID を割った余りで分散するには UserID を使用できます(この場合、1 人のユーザーのデータは 1 つのシャードにのみ配置されるため、ユーザー単位の INJOIN を実行しやすくなります)。ある列が十分に均等に分散されない場合は、intHash64(UserID) のようにハッシュ関数でラップできます。

単純な除算の余りによる方法は、シャーディングの解決策としては限定的であり、常に適切というわけではありません。これは、中〜大規模(サーバーが数十台)のデータ量では機能しますが、非常に大規模(サーバーが数百台以上)のデータ量には向きません。後者の場合、Distributed テーブルを使用するのではなく、対象分野で求められるシャーディング方式を使用してください。

次のような場合には、シャーディング方式について検討する必要があります。

  • 特定のキーでデータを結合する(IN または JOIN)クエリを使用している場合、そのキーでデータがシャーディングされていれば、GLOBAL INGLOBAL JOIN よりもはるかに効率的なローカルな IN または JOIN を使用できます。
  • 多数のサーバー(数百台以上)を使用し、多数の小さなクエリ、たとえば個々のクライアント(ウェブサイト、広告主、パートナーなど)のデータに対するクエリを実行する場合。小さなクエリがクラスター全体に影響しないようにするには、1 クライアントのデータを 1 シャード上に配置するのが理にかなっています。あるいは、二段階のシャーディングを構成することもできます。クラスター全体を複数の「レイヤー」に分割し、レイヤーは複数のシャードから構成されるようにします。1 クライアントのデータは 1 つのレイヤー内に配置されますが、必要に応じてそのレイヤーにシャードを追加でき、データはそれらのシャード内でランダムに分散されます。各レイヤーに対して Distributed テーブルを作成し、グローバルなクエリ用に 1 つの共有の Distributed テーブルを作成します。

データはバックグラウンドで書き込まれます。テーブルに INSERT されたとき、データブロックはローカルファイルシステムに書き込まれるだけです。データは可能な限り早くバックグラウンドでリモートサーバーへ送信されます。データ送信の周期は、distributed_background_insert_sleep_time_ms および distributed_background_insert_max_sleep_time_ms の設定で制御されます。Distributed エンジンは挿入されたデータを含むファイルを個別に送信しますが、distributed_background_insert_batch 設定を有効にすることで、ファイルのバッチ送信を有効化できます。この設定により、ローカルサーバーおよびネットワークリソースをより有効に活用することで、クラスターのパフォーマンスが向上します。テーブルディレクトリ /var/lib/clickhouse/data/database/table/ にあるファイル(送信待ちデータ)の一覧を確認することで、データが正常に送信されているか確認する必要があります。バックグラウンドタスクを実行するスレッド数は、background_distributed_schedule_pool_size 設定で指定できます。

Distributed テーブルへの INSERT 実行後に、サーバーが消失した場合や(ハードウェア障害などにより)クラッシュして強制再起動された場合、挿入されたデータが失われる可能性があります。テーブルディレクトリ内で破損したデータパーツが検出されると、それは broken サブディレクトリに移動され、以後は使用されません。

データの読み取り

Distributed テーブルをクエリする場合、SELECT クエリはすべてのシャードに送信され、データがシャード間でどのように分散されているかに関係なく動作します(完全にランダムに分散されていても問題ありません)。新しいシャードを追加する場合、既存のデータをそのシャードへ移行する必要はありません。その代わり、新しいシャードに対してより大きな重み付けを指定して新しいデータを書き込むことができます。この場合、データの分散はやや不均一になりますが、クエリは正しくかつ効率的に動作します。

max_parallel_replicas オプションが有効な場合、クエリ処理は 1 つのシャード内のすべてのレプリカに対して並列化されます。詳細については、max_parallel_replicas セクションを参照してください。

分散環境における in および global in クエリがどのように処理されるかの詳細については、こちら のドキュメントを参照してください。

仮想カラム

_Shard_num

_shard_num — テーブル system.clustersshard_num の値を保持します。型: UInt32

注記

remote および [cluster](../../../sql-reference/table-functions/cluster.md) テーブル関数は内部的に一時的な Distributed テーブルを作成するため、_shard_num` はそれらでも利用可能です。

関連項目