Spark JDBC
JDBC は Spark で最も一般的に使用されるデータソースの 1 つです。 このセクションでは、Spark で ClickHouse 公式 JDBC コネクタ を使用する方法について詳しく説明します。
データの読み取り
- Java
- Scala
- Python
JARファイルを使用したSparkセッションの初期化
spark = SparkSession.builder
.appName("example")
.master("local")
.config("spark.jars", ",".join(jar_files))
.getOrCreate()
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
query = "select * from example_table where id > 2"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
df = (spark.read .format('jdbc') .option('driver', driver) .option('url', url) .option('user', user) .option('password', password).option( 'query', query).load())
df.show()
データの書き込み
- Java
- Scala
- Python
JAR ファイルを指定して Spark セッションを初期化する
spark = SparkSession.builder
.appName("example")
.master("local")
.config("spark.jars", ",".join(jar_files))
.getOrCreate()
DataFrame を作成
data = [Row(id=11, name="John"), Row(id=12, name="Doe")] df = spark.createDataFrame(data)
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
DataFrameをClickHouseに書き込む
df.write
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("user", user)
.option("password", password)
.option("dbtable", "example_table")
.mode("append")
.save()
並列処理
Spark JDBC を使用する場合、Spark はデータを単一のパーティションで読み込みます。より高い並行性を得るには、
partitionColumn、lowerBound、upperBound、numPartitions を指定し、複数のワーカーから並列に読み込むための
テーブルのパーティション方法を定義する必要があります。
詳細については、JDBC 設定に関する
Apache Spark 公式ドキュメントを参照してください。
JDBC の制限事項
- 現時点では、JDBC 経由でデータを挿入できるのは既存のテーブルに対してのみです(Spark が他のコネクタで行っているような、DataFrame 挿入時のテーブル自動作成は現時点ではサポートされていません)。