Databricksで最初のETLワークロードを実行する
Databricksの本番対応ツールを使用して、データオーケストレーション用の最初の抽出、変換、ロード(ETL)パイプラインを開発およびデプロイする方法をご紹介します。
この記事の終わりまでに、以下をスムーズに行えるようになります。
このチュートリアルでは、対話型ノートブックを使用して、PythonまたはScalaによる一般的なETLタスクを完了します。
Delta Live Tables使用してETLパイプラインを構築することもできます。 DatabricksDelta Live Tables本番運用ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために を作成しました。「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。
Databricks Terraformプロバイダーを使用してこの記事のリソースを作成することもできます。「Terraformでクラスター、ノートブック、ジョブを作成する」を参照してください。
要件
Databricksワークスペースにログインしていること。
クラスターを作成する権限があります。
注
クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のステップのほとんどを完了できます。
ステップ1:クラスターを作成する
探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピュートリソースを提供するクラスターを作成します。
サイドバー の 「コンピュート 」をクリックします 。
「コンピュート」ページで、「クラスターを作成」をクリックします。これにより、「新しいクラスター」ページが開きます。
クラスターの一意の名前を指定し、残りの値はデフォルトのままにして、「クラスターを作成」をクリックします。
Databricks クラスターの詳細については、「 コンピュート」を参照してください。
ステップ2:Databricksノートブックを作成する
ワークスペースにノートブックを作成するには、サイドバーで「新規」をクリックし、 「新聞」をクリックします。 ワークスペースに空白のノートブックが開きます。
ノートブックの作成と管理の詳細については、「ノートブックの管理」を参照してください。
ステップ3:データをDelta Lakeに取り込むようにAuto Loaderを構成する
Databricks では、増分データの取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウドオブジェクトストレージに到着すると、自動的に検出して処理します。
Databricksでは、Delta Lakeを使用してデータを保存することをお勧めします。Delta Lakeは、ACIDトランザクションを提供し、データレイクハウスを可能にするオープンソースストレージレイヤーです。Delta Lakeは、Databricksで作成されるデフォルトのテーブル形式です。
Delta Lakeテーブルにデータを取り込むようにAuto Loaderを設定するには、以下のコードをコピーしてノートブックの空のセルに貼り付けます。
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
注
このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。
Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。
ステップ4:データを処理し、操作する
ノートブックは、セルごとにロジックを実行します。セル内のロジックを実行するには、以下を実行します。
前の手順で完了したセルを実行するには、セルを選択してSHIFT+ENTERキーを押します。
作成したテーブルを検索するには、以下のコードをコピーして空のセルに貼り付け、Shift+Enterキーを押してセルを実行します。
df = spark.read.table(table_name)
val df = spark.read.table(table_name)
DataFrame内のデータをプレビューするには、以下のコードを空のセルにコピーして貼り付け、Shift + Enterキーを押してセルを実行します。
display(df)
display(df)
データを視覚化するための対話型オプションの詳細については、「Databricksノートブックでのビジュアライゼーション」を参照してください。
ステップ5:ジョブをスケジュールする
DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。
ノートブックをタスクとしてスケジュールするには、以下を実行します。
ヘッダーバーの右側にある「スケジュール」をクリックします。
「ジョブ名」に一意の名前を入力します。
「手動」をクリックします。
「クラスター」ドロップダウンで、ステップ1で作成したクラスターを選択します。
「作成」をクリックします。
表示されるウィンドウで、「今すぐ実行」をクリックします。
ジョブ実行の結果を表示するには、[ 最終実行 タイムスタンプ] の横にあるアイコンをクリックします 。
ジョブの詳細については、「Databricksジョブとは」を参照してください。