跳转到主内容
跳转到主内容

Amazon Glue 与 ClickHouse 和 Spark 的集成

ClickHouse Supported

Amazon Glue 是 Amazon Web Services (AWS) 提供的一项全托管的无服务器数据集成服务。它简化了数据发现、准备和转换流程,可用于分析、机器学习和应用开发。

安装

要将您的 Glue 代码与 ClickHouse 集成,您可以通过以下任一方式在 Glue 中使用我们的官方 Spark 连接器:

  • 从 AWS Marketplace 安装 ClickHouse Glue 连接器 (推荐) 。
  • 将 Spark 连接器的 jar 包手动添加到您的 Glue 作业中。
  1. 订阅连接器

    要在您的账户中使用该连接器,请先在 AWS Marketplace 订阅 ClickHouse AWS Glue Connector。

  2. 授予所需权限

    确保您的 Glue 作业所使用的 IAM 角色具有所需权限,详见最低权限指南

  3. 激活连接器并创建连接

    您可以点击此链接直接激活该连接器并创建连接。该链接会打开 Glue 的连接创建页面,并预先填好关键字段。为该连接命名后,点击 create (此阶段无需提供 ClickHouse 连接详细信息) 。

  4. 在 Glue 作业中使用

    在您的 Glue 作业中,选择 Job details 选项卡,然后展开 Advanced properties 窗格。在 Connections 部分中,选择您刚刚创建的连接。该连接器会自动将所需的 JAR 注入作业运行时。

Glue Notebook 连接配置
注意

Glue 连接器中使用的 JAR 是为 Spark 3.3Scala 2Python 3 构建的。配置 Glue 作业时,请确保选择这些版本。

示例

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // for ClickHouse cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Write to ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Read from ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

如需了解更多信息,请参阅我们的Spark 文档