適用対象: NoSQL
このチュートリアルでは、Azure Cosmos DB Spark コネクタを使って、Azure Cosmos DB for NoSQL アカウントのデータの読み取りや書き込みを行います。 このチュートリアルでは、Azure Databricks と Jupyter ノートブックを使って、Spark から API for NoSQL と統合する方法を見ていただきます。 Spark でサポートされている任意の言語またはインターフェイスを使用できますが、このチュートリアルでは Python と Scala に焦点を当てます。
このチュートリアルでは、次の作業を行う方法について説明します。
- Spark と Jupyter ノートブックを使って API for NoSQL アカウントに接続します。
- データベースとコンテナーのリソースを作成します。
- データをコンテナーに取り込みます。
- コンテナー内のデータのクエリを実行します。
- コンテナー内の項目に対して一般的な操作を実行します。
Prerequisites
- 既存の Azure Cosmos DB for NoSQL アカウント。
- Azure サブスクリプションを既にお持ちの場合は、新しいアカウントを作成します。
- Azure サブスクリプションがない場合。 Azure Cosmos DB を無料で試すことができます。クレジット カードは必要ありません。
- 既存の Azure Databricks ワークスペース。
Spark と Jupyter を使用して接続する
Apache Spark 3.4.x を使って Azure Cosmos DB for NoSQL アカウントに接続できる状態のコンピューティング クラスターを、既存の Azure Databricks ワークスペースを使って作成します。
Azure Databricks ワークスペースを開きます。
In the workspace interface, create a new cluster. 少なくとも次の設定でクラスターを構成します。
Version Value Runtime version 13.3 LTS (Scala 2.12、Spark 3.4.1) Use the workspace interface to search for Maven packages from Maven Central with a Group ID of
com.azure.cosmos.spark
. Install the package specifically for Spark 3.4 with an Artifact ID prefixed withazure-cosmos-spark_3-4
to the cluster.Finally, create a new notebook.
Tip
既定では、ノートブックは最近作成されたクラスターにアタッチされます。
ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名のオンライン トランザクション処理 (OLTP) 構成設定を設定します。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
データベースとコンテナーを作成する
Catalog API を使って、データベースやコンテナーなどのアカウント リソースを管理します。 その後、OLTP を使ってコンテナー リソース内のデータを管理できます。
Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
cosmicworks
を使用して、CREATE DATABASE IF NOT EXISTS
という新しいデータベースを作成します。# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
products
を使って、CREATE TABLE IF NOT EXISTS
という名前の新しいコンテナーを作成します。 パーティション キーのパスを/category
に設定し、最大スループットを1000
1 秒あたりの要求ユニット数 (RU) にして自動スケーリング スループットを有効にします。# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
階層的なパーティション キー構成を使用して、
employees
という名前を付けた別のコンテナーを作成します。 パーティション キー パスのセットとして、/organization
、/department
、/team
を使用します。 その特定の順序に従います。 また、スループットを手動の数の400
RU に設定します。# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
ノートブックのセルを実行して、API for NoSQL アカウント内にデータベースとコンテナーが作成されることを検証します。
Ingest data
サンプル データセットを作成します。 次に、OLTP を使用して、そのデータを NoSQL コンテナー用 API に取り込みます。
サンプル データセットを作成します。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
spark.createDataFrame
と前に保存した OLTP 構成を使って、サンプル データをターゲット コンテナーに追加します。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Query data
OLTP データをデータ フレームに読み込み、データに対して一般的なクエリを実行します。 さまざまな構文を使って、データのフィルターまたはクエリを実行できます。
spark.read
を使って、OLTP データをデータフレーム オブジェクトに読み込みます。 このチュートリアルで前に使ったのと同じ構成を使います。 また、spark.cosmos.read.inferSchema.enabled
をtrue
に設定して、Spark コネクタが既存の項目をサンプリングしてスキーマを推論できるようにします。# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
printSchema
を使って、データフレームに読み込まれたデータのスキーマをレンダリングします。# Render schema df.printSchema()
// Render schema df.printSchema()
quantity
列が20
未満のデータ行をレンダリングします。 このクエリを実行するには、where
とshow
関数を使います。# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
clearance
列がtrue
である最初のデータ行をレンダリングします。 このクエリを実行するには、filter
関数を使います。# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
フィルターまたは切り詰めなしで、5 行のデータをレンダリングします。
show
関数を使って、外観とレンダリングされる行数をカスタマイズします。# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
次の生の NoSQL クエリ文字列を使ってデータのクエリを実行します:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
一般的な操作を実行する
Spark で API for NoSQL データを使用するときは、生の JSON として部分的な更新を実行したりデータを操作したりできます。
項目の部分的な更新を実行するには、次の操作を実行します。
既存の
config
構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ItemPatch
への書き込み戦略を構成します。 その後、一括サポートを無効にします。 列とマップされた操作を設定します。 最後に、既定の操作の種類をSet
に設定します。# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
このパッチ操作の一部としてターゲットにする項目のパーティション キーと一意識別子のための変数を作成します。
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
ターゲット項目を指定し、変更する必要があるフィールドを指定するための、パッチ オブジェクトのセットを作成します。
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
パッチ オブジェクトのセットを使用してデータ フレームを作成します。
write
を使用してパッチ操作を実行します。# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
クエリを実行して、パッチ操作の結果を確認します。 これで、項目は
Yamba New Surfboard
という名前になり、それ以外は変更されていないはずです。# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
生の JSON データを操作するには、次の操作を実行します。
既存の
config
構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ターゲット コンテナーをemployees
に変更します。 次に、未加工の JSON データを使用するように、contacts
列/フィールドを構成します。# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
コンテナーに取り込むための一連の従業員を作成します。
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
データ フレームを作成し、
write
を使って従業員データを取り込みます。# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
show
を使ってデータ フレームからデータをレンダリングします。contacts
列が出力で生の JSON であることを確認します。# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Related content
- Apache Spark
- Azure Cosmos DB Catalog API
- 構成パラメーター参照
- Azure Cosmos DB Spark コネクタのサンプル
- Spark 2.4 から Spark 3.* に移行する
- Deprecated versions:
- Azure Databricks、Azure Synapse、または Azure HDInsight でサポートされている Spark 3.1 または 3.2 ランタイムが使用できなくなるため、Azure Cosmos DB Spark Connector for Spark 3.1 および 3.2 は非推奨となりました。
- Spark 3.1 から更新するための移行ガイド
- Spark 3.2 から更新するための移行ガイド
- Version compatibility:
- Release notes:
- Download links: