メインコンテンツへスキップ
メインコンテンツへスキップ

Amazon Glue を ClickHouse および Spark と統合する

ClickHouse Supported

Amazon Glue は、Amazon Web Services (AWS) が提供するフルマネージドなサーバーレスのデータ統合サービスです。分析、機械学習、アプリケーション開発のためのデータの検出、準備、変換といった処理を簡素化します。

インストール

Glue のコードを ClickHouse と連携させるには、次のいずれかの方法で Glue から公式 Spark コネクタを利用できます。

  • AWS Marketplace から ClickHouse Glue コネクタをインストールする(推奨)。
  • Spark コネクタの JAR を手動で Glue ジョブに追加する。
  1. コネクタをサブスクライブする

    自分のアカウントでコネクタにアクセスするには、AWS Marketplace から ClickHouse AWS Glue Connector をサブスクライブします。

  2. 必要な権限を付与する

    Glue ジョブの IAM ロールに、最小権限のガイドで説明されている必要な権限が付与されていることを確認します。

  3. コネクタを有効化して接続を作成する

    このリンクをクリックすると、主要な項目があらかじめ入力された状態で Glue の接続作成ページが開き、そこからコネクタを有効化して接続を作成できます。接続に名前を付けて「作成」をクリックします(この段階では ClickHouse の接続情報を入力する必要はありません)。

  4. Glue ジョブでの利用

    Glue ジョブで Job details タブを選択し、Advanced properties ウィンドウを展開します。Connections セクションで、先ほど作成した接続を選択します。コネクタは、必要な JAR をジョブのランタイムに自動的に注入します。

Glue Notebook connections 設定
注記

Glue コネクタで使用される JAR は、Spark 3.3Scala 2Python 3 向けにビルドされています。Glue ジョブを設定する際は、必ずこれらのバージョンを選択してください。

使用例

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // ClickHouse Cloud 用
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // ClickHouse への書き込み
    df.writeTo("clickhouse.default.cell_towers").append()


    // ClickHouse からの読み取り
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

詳しくは、Spark のドキュメントをご覧ください。