跳到主要内容
跳到主要内容

Spark JDBC

ClickHouse Supported

JDBC 是 Spark 中最常用的数据源之一。 在本节中,我们将介绍如何使用 ClickHouse 官方 JDBC 连接器 与 Spark 集成。

读取数据

public static void main(String[] args) {
        // 初始化 Spark 会话
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        String jdbcURL = "jdbc:ch://localhost:8123/default";
        String query = "select * from example_table where id > 2";

        //---------------------------------------------------------------------------------------------------
        // 使用 JDBC 从 ClickHouse 读取表数据
        //---------------------------------------------------------------------------------------------------
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

        df1.show();

        //---------------------------------------------------------------------------------------------------
        // 使用 load 方法从 ClickHouse 读取表数据
        //---------------------------------------------------------------------------------------------------
        Dataset<Row> df2 = spark.read()
                .format("jdbc")
                .option("url", jdbcURL)
                .option("user", "default")
                .option("password", "123456")
                .option("query", query)
                .load();

        df2.show();

        // 停止 Spark 会话
        spark.stop();
    }

写入数据

 public static void main(String[] args) {
        // 初始化 Spark 会话
        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

        // JDBC 连接详细信息
        String jdbcUrl = "jdbc:ch://localhost:8123/default";
        Properties jdbcProperties = new Properties();
        jdbcProperties.put("user", "default");
        jdbcProperties.put("password", "123456");

        // 创建示例 DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false)
        });

        List<Row> rows = new ArrayList<Row>();
        rows.add(RowFactory.create(1, "John"));
        rows.add(RowFactory.create(2, "Doe"));

        Dataset<Row> df = spark.createDataFrame(rows, schema);

        //---------------------------------------------------------------------------------------------------
        // 使用 jdbc 方法将 DataFrame 写入 ClickHouse
        //---------------------------------------------------------------------------------------------------

        df.write()
                .mode(SaveMode.Append)
                .jdbc(jdbcUrl, "example_table", jdbcProperties);

        //---------------------------------------------------------------------------------------------------
        // 使用 save 方法将 DataFrame 写入 ClickHouse
        //---------------------------------------------------------------------------------------------------

        df.write()
                .format("jdbc")
                .mode("append")
                .option("url", jdbcUrl)
                .option("dbtable", "example_table")
                .option("user", "default")
                .option("password", "123456")
                .save();

        // 停止 Spark 会话
        spark.stop();
    }

并行度

使用 Spark JDBC 时,Spark 默认只使用单个分区读取数据。要获得更高的并行度,你需要指定 partitionColumnlowerBoundupperBoundnumPartitions,用于定义在由多个 worker 并行读取时如何对表进行分区。 如需了解更多信息,请参阅 Apache Spark 官方文档中的 JDBC 配置

JDBC 限制

  • 截至目前,通过 JDBC 只能将数据插入到已存在的表中(目前无法像 Spark 使用其他连接器那样,在插入 DataFrame 时自动创建表)。