メインコンテンツへスキップ
メインコンテンツへスキップ

Spark コネクタ

このコネクタは、高度なパーティション分割や述語プッシュダウンなど、ClickHouse 固有の最適化を活用して、 クエリパフォーマンスとデータ処理を向上させます。 このコネクタは ClickHouse の公式 JDBC コネクタ をベースとしており、 独自のカタログを管理します。

Spark 3.0 以前は、Spark に組み込みのカタログという概念がなかったため、ユーザーは通常、 Hive Metastore や AWS Glue などの外部カタログシステムに依存していました。 これらの外部ソリューションでは、Spark でテーブルにアクセスする前に、ユーザーがデータソーステーブルを手動で登録する必要がありました。 しかし Spark 3.0 以降、カタログの概念が導入されたことで、Spark はカタログプラグインを登録することでテーブルを自動的に検出できるようになりました。

Spark のデフォルトのカタログは spark_catalog であり、テーブルは {catalog name}.{database}.{table} という形式で識別されます。 新しいカタログ機能により、1 つの Spark アプリケーション内で複数のカタログを追加して利用できるようになりました。

要件

  • Java 8 または 17
  • Scala 2.12 または 2.13
  • Apache Spark 3.3、3.4、または 3.5

互換性マトリクス

バージョン対応する Spark バージョンClickHouse JDBC バージョン
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存なし
0.3.0Spark 3.2, 3.3依存なし
0.2.1Spark 3.2依存なし
0.1.2Spark 3.2依存なし

インストール & セットアップ

Spark と ClickHouse を連携するためのインストール方法には、プロジェクト構成に応じていくつかの選択肢があります。 pom.xml(Maven の場合)や build.sbt(SBT の場合)など、プロジェクトのビルドファイルに ClickHouse Spark connector を依存関係として直接追加できます。 または、必要な JAR ファイルを $SPARK_HOME/jars/ フォルダに配置するか、spark-submit コマンドで --jars フラグを使用して Spark のオプションとして直接指定することもできます。 いずれの方法でも、Spark 環境で ClickHouse connector を利用可能にできます。

依存関係としてインポート

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

SNAPSHOT バージョンを使用する場合は、次のリポジトリを追加してください。

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

ライブラリのダウンロード

バイナリ JAR の名前のパターンは次のとおりです。

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

利用可能なリリース済みのすべての JAR ファイルは Maven Central Repository から入手でき、 日次ビルドのすべての SNAPSHOT JAR ファイルは Sonatype OSS Snapshots Repository から入手できます。

参考文献

コネクタは clickhouse-http および clickhouse-client に依存しており、 これらはどちらも clickhouse-jdbc:all にバンドルされているため、 classifier が "all" の clickhouse-jdbc JAR を必ず含めてください。 代わりに、フルの JDBC パッケージを使用したくない場合は、 clickhouse-client JARclickhouse-http を 個別に追加することもできます。

いずれの場合でも、Compatibility Matrix に従って パッケージのバージョンに互換性があることを確認してください。

カタログを登録する(必須)

ClickHouse のテーブルにアクセスするには、次の設定を使用して新しい Spark カタログを設定する必要があります。

PropertyValueDefault ValueRequired
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(empty string)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

これらの設定は、次のいずれかの方法で指定できます。

  • spark-defaults.conf を編集または新規作成する。
  • 設定を spark-submit コマンド(または spark-shell / spark-sql の CLI コマンド)に渡す。
  • コンテキストを初期化する際に設定を追加する。
参考文献

ClickHouse クラスターを使用する場合は、各インスタンスごとに一意のカタログ名を設定する必要があります。 例:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

このようにすると、Spark SQL からは clickhouse1 のテーブル <ck_db>.<ck_table> には clickhouse1.<ck_db>.<ck_table> としてアクセスでき、clickhouse2 のテーブル <ck_db>.<ck_table> には clickhouse2.<ck_db>.<ck_table> としてアクセスできるようになります。

ClickHouse Cloud の設定

ClickHouse Cloud に接続する際は、SSL を有効にし、適切な SSL モードを設定してください。例えば、次のとおりです。

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

データの読み込み

public static void main(String[] args) {
        // Spark セッションを作成
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

データの書き込み

 public static void main(String[] args) throws AnalysisException {

        // Sparkセッションを作成
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // DataFrameのスキーマを定義
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // DataFrameを作成
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

DDL 操作

Spark SQL を使用して ClickHouse インスタンスに対して DDL 操作を実行でき、すべての変更は即座に ClickHouse に永続化されます。 Spark SQL では、ClickHouse で実行するのとまったく同じようにクエリを記述できるため、 CREATE TABLE や TRUNCATE などのコマンドを、変更せずにそのまま直接実行できます。例えば次のとおりです。

注記

Spark SQL を使用する場合、一度に実行できるステートメントは 1 つだけです。

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

上記の例は Spark SQL クエリを示しており、Java、Scala、PySpark、またはシェルなどのいずれの API を使用しても、アプリケーション内で実行できます。

設定

以下は、このコネクタで調整可能な設定項目です。


キーデフォルト概要以降
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse は、シャーディングキーやパーティション値として複雑な式(例: cityHash64(col_1, col_2))を使用できますが、これらは現在 Spark ではサポートされていません。true の場合はサポートされていない式を無視し、それ以外の場合は例外をスローして即座にエラー終了します。なお、spark.clickhouse.write.distributed.convertLocal が有効な場合、サポートされていないシャーディングキーを無視するとデータが破損するおそれがあります。0.4.0
spark.clickhouse.read.compression.codeclz4読み取り時にデータを解凍するために使用するコーデック。サポートされるコーデック: none, lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrueDistributed テーブルを読み込む際は、自身ではなくローカルテーブルを読み込みます。true の場合、spark.clickhouse.read.distributed.useClusterNodes は無視されます。0.1.0
spark.clickhouse.read.fixedStringAsバイナリClickHouse の FixedString 型を指定した Spark データ型として読み取ります。サポートされる型:binary、string0.8.0
spark.clickhouse.read.formatjson読み取り用のシリアライズ形式。サポートされる形式: JSON, Binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse読み取り用のランタイムフィルターを有効化します。0.8.0
spark.clickhouse.read.splitByPartitionIdtruetrue の場合、パーティション値ではなく仮想カラム _partition_id を使って入力パーティションフィルタを構成します。パーティション値によって SQL の述語を組み立てる場合には、既知の問題があります。この機能には ClickHouse Server v21.6 以降が必要です。0.4.0
spark.clickhouse.useNullableQuerySchemafalsetrue の場合、テーブル作成時に CREATE/REPLACE TABLE ... AS SELECT ... を実行すると、クエリスキーマ内のすべてのフィールドを nullable としてマークします。なお、この設定には SPARK-43390(Spark 3.5 で利用可能)が必要であり、このパッチがない場合は設定値に関係なく常に true として動作します。0.8.0
spark.clickhouse.write.batchSize10000ClickHouse への書き込み時に、1 バッチあたりに含めるレコード数。0.1.0
spark.clickhouse.write.compression.codeclz4書き込み時にデータを圧縮するためのコーデック。サポートされているコーデックは none と lz4 です。0.3.0
spark.clickhouse.write.distributed.convertLocalfalseDistributed テーブルに書き込む際は、自身ではなくローカルテーブルに書き込みます。true の場合、spark.clickhouse.write.distributed.useClusterNodes を無視します。0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueDistributed テーブルへの書き込み時に、クラスタ内のすべてのノードに書き込む。0.1.0
spark.clickhouse.write.format矢印書き込み時のシリアル化形式。サポートされる形式: JSON、Arrow0.4.0
spark.clickhouse.write.localSortByKeytruetrue の場合、書き込み前にソートキーに基づいてローカルでソートを行います。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition の値true の場合、書き込み前にローカルでパーティションごとにソートを行います。設定されていない場合は、spark.clickhouse.write.repartitionByPartition と同じ値になります。0.3.0
spark.clickhouse.write.maxRetry3再試行可能なエラーコードによって単一バッチ書き込みが失敗した場合に、その書き込みを再試行する最大回数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue書き込み前に、ClickHouse テーブルのパーティション分布に合わせて ClickHouse のパーティションキーでデータを再パーティションするかどうか。0.3.0
spark.clickhouse.write.repartitionNum0書き込み前に ClickHouse テーブルのディストリビューションに合うようデータを再パーティションする必要がある場合に、この設定で再パーティション数を指定します。値が 1 未満の場合は、再パーティションを要求しないことを意味します。0.1.0
spark.clickhouse.write.repartitionStrictlyfalsetrue の場合、Spark は書き込み時にデータソーステーブルへレコードを渡す前に、要求されるデータ分散を満たすよう、入力レコードを厳密にパーティション間へ分配します。true でない場合、Spark はクエリを高速化するために特定の最適化を適用することがありますが、その結果、分散要件が満たされないことがあります。なお、この設定は SPARK-37523(Spark 3.4 で利用可能)の適用が前提であり、このパッチがない場合は常に true として動作します。0.3.0
spark.clickhouse.write.retryInterval10秒書き込み再試行間隔(秒)0.1.0
spark.clickhouse.write.retryableErrorCodes241書き込み処理が失敗した際に ClickHouse サーバーから返される再試行可能なエラーコード。0.1.0

サポートされるデータ型

このセクションでは、Spark と ClickHouse の間のデータ型のマッピングについて説明します。以下の表は、ClickHouse から Spark への読み込み時および Spark から ClickHouse への挿入時にデータ型を変換するためのクイックリファレンスを提供します。

ClickHouse から Spark へのデータ読み込み

ClickHouse Data TypeSpark Data TypeSupportedIs PrimitiveNotes
NothingNullTypeYes
BoolBooleanTypeYes
UInt8, Int16ShortTypeYes
Int8ByteTypeYes
UInt16,Int32IntegerTypeYes
UInt32,Int64, UInt64LongTypeYes
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Yes
Float32FloatTypeYes
Float64DoubleTypeYes
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeYes
FixedStringBinaryType, StringTypeYes設定 READ_FIXED_STRING_AS によって制御されます
DecimalDecimalTypeYesDecimal128 までの精度およびスケール
Decimal32DecimalType(9, scale)Yes
Decimal64DecimalType(18, scale)Yes
Decimal128DecimalType(38, scale)Yes
Date, Date32DateTypeYes
DateTime, DateTime32, DateTime64TimestampTypeYes
ArrayArrayTypeNo配列要素の型も変換されます
MapMapTypeNoキーは StringType に限定されます
IntervalYearYearMonthIntervalType(Year)Yes
IntervalMonthYearMonthIntervalType(Month)Yes
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeNo対応する個別のインターバル型が使用されます
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Spark から ClickHouse へのデータ挿入

Spark Data TypeClickHouse Data Typeサポート有無プリミティブ型備考
BooleanTypeUInt8はい
ByteTypeInt8はい
ShortTypeInt16はい
IntegerTypeInt32はい
LongTypeInt64はい
FloatTypeFloat32はい
DoubleTypeFloat64はい
StringTypeStringはい
VarcharTypeStringはい
CharTypeStringはい
DecimalTypeDecimal(p, s)はい精度とスケールは Decimal128 までサポート
DateTypeDateはい
TimestampTypeDateTimeはい
ArrayType (list, tuple, or array)Arrayいいえ配列要素の型も変換される
MapTypeMapいいえキーは StringType のみ
Object
Nested

貢献とサポート

プロジェクトへの貢献や問題の報告を希望される場合は、ぜひご協力ください。 GitHub リポジトリ にアクセスし、Issue の作成、改善提案、 または Pull Request の送信を行ってください。 貢献はいつでも歓迎しています。作業を始める前に、リポジトリ内の貢献ガイドラインをご確認ください。 ClickHouse Spark コネクタの改善にご協力いただきありがとうございます。