次の方法で共有


チュートリアル: Spark を使用して Azure Cosmos DB for NoSQL に接続する

適用対象: NoSQL

このチュートリアルでは、Azure Cosmos DB Spark コネクタを使って、Azure Cosmos DB for NoSQL アカウントのデータの読み取りや書き込みを行います。 このチュートリアルでは、Azure Databricks と Jupyter ノートブックを使って、Spark から API for NoSQL と統合する方法を見ていただきます。 Spark でサポートされている任意の言語またはインターフェイスを使用できますが、このチュートリアルでは Python と Scala に焦点を当てます。

このチュートリアルでは、次の作業を行う方法について説明します。

  • Spark と Jupyter ノートブックを使って API for NoSQL アカウントに接続します。
  • データベースとコンテナーのリソースを作成します。
  • データをコンテナーに取り込みます。
  • コンテナー内のデータのクエリを実行します。
  • コンテナー内の項目に対して一般的な操作を実行します。

Prerequisites

  • 既存の Azure Cosmos DB for NoSQL アカウント。
  • 既存の Azure Databricks ワークスペース。

Spark と Jupyter を使用して接続する

Apache Spark 3.4.x を使って Azure Cosmos DB for NoSQL アカウントに接続できる状態のコンピューティング クラスターを、既存の Azure Databricks ワークスペースを使って作成します。

  1. Azure Databricks ワークスペースを開きます。

  2. In the workspace interface, create a new cluster. 少なくとも次の設定でクラスターを構成します。

    Version Value
    Runtime version 13.3 LTS (Scala 2.12、Spark 3.4.1)
  3. Use the workspace interface to search for Maven packages from Maven Central with a Group ID of com.azure.cosmos.spark. Install the package specifically for Spark 3.4 with an Artifact ID prefixed with azure-cosmos-spark_3-4 to the cluster.

  4. Finally, create a new notebook.

    Tip

    既定では、ノートブックは最近作成されたクラスターにアタッチされます。

  5. ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名のオンライン トランザクション処理 (OLTP) 構成設定を設定します。

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

データベースとコンテナーを作成する

Catalog API を使って、データベースやコンテナーなどのアカウント リソースを管理します。 その後、OLTP を使ってコンテナー リソース内のデータを管理できます。

  1. Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. cosmicworks を使用して、CREATE DATABASE IF NOT EXISTS という新しいデータベースを作成します。

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. products を使って、CREATE TABLE IF NOT EXISTS という名前の新しいコンテナーを作成します。 パーティション キーのパスを /category に設定し、最大スループットを 1000 1 秒あたりの要求ユニット数 (RU) にして自動スケーリング スループットを有効にします。

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. 階層的なパーティション キー構成を使用して、employees という名前を付けた別のコンテナーを作成します。 パーティション キー パスのセットとして、/organization/department/team を使用します。 その特定の順序に従います。 また、スループットを手動の数の 400 RU に設定します。

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. ノートブックのセルを実行して、API for NoSQL アカウント内にデータベースとコンテナーが作成されることを検証します。

Ingest data

サンプル データセットを作成します。 次に、OLTP を使用して、そのデータを NoSQL コンテナー用 API に取り込みます。

  1. サンプル データセットを作成します。

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. spark.createDataFrame と前に保存した OLTP 構成を使って、サンプル データをターゲット コンテナーに追加します。

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Query data

OLTP データをデータ フレームに読み込み、データに対して一般的なクエリを実行します。 さまざまな構文を使って、データのフィルターまたはクエリを実行できます。

  1. spark.read を使って、OLTP データをデータフレーム オブジェクトに読み込みます。 このチュートリアルで前に使ったのと同じ構成を使います。 また、spark.cosmos.read.inferSchema.enabledtrue に設定して、Spark コネクタが既存の項目をサンプリングしてスキーマを推論できるようにします。

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. printSchema を使って、データフレームに読み込まれたデータのスキーマをレンダリングします。

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. quantity 列が 20 未満のデータ行をレンダリングします。 このクエリを実行するには、whereshow 関数を使います。

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. clearance 列が true である最初のデータ行をレンダリングします。 このクエリを実行するには、filter 関数を使います。

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. フィルターまたは切り詰めなしで、5 行のデータをレンダリングします。 show 関数を使って、外観とレンダリングされる行数をカスタマイズします。

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. 次の生の NoSQL クエリ文字列を使ってデータのクエリを実行します: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

一般的な操作を実行する

Spark で API for NoSQL データを使用するときは、生の JSON として部分的な更新を実行したりデータを操作したりできます。

  1. 項目の部分的な更新を実行するには、次の操作を実行します。

    1. 既存の config 構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ItemPatch への書き込み戦略を構成します。 その後、一括サポートを無効にします。 列とマップされた操作を設定します。 最後に、既定の操作の種類を Set に設定します。

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. このパッチ操作の一部としてターゲットにする項目のパーティション キーと一意識別子のための変数を作成します。

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. ターゲット項目を指定し、変更する必要があるフィールドを指定するための、パッチ オブジェクトのセットを作成します。

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. パッチ オブジェクトのセットを使用してデータ フレームを作成します。 write を使用してパッチ操作を実行します。

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. クエリを実行して、パッチ操作の結果を確認します。 これで、項目は Yamba New Surfboard という名前になり、それ以外は変更されていないはずです。

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. 生の JSON データを操作するには、次の操作を実行します。

    1. 既存の config 構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ターゲット コンテナーを employees に変更します。 次に、未加工の JSON データを使用するように、contacts 列/フィールドを構成します。

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. コンテナーに取り込むための一連の従業員を作成します。

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. データ フレームを作成し、write を使って従業員データを取り込みます。

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. show を使ってデータ フレームからデータをレンダリングします。 contacts 列が出力で生の JSON であることを確認します。

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Next step