チュートリアル:エンドツーエンドのレイクハウス分析パイプラインを実行する
このチュートリアルでは、Databricksレイクハウスのエンドツーエンド分析パイプラインを設定する方法を説明します。
重要
このチュートリアルでは、対話型ノートブックを使用して、Unityカタログが有効なクラスター上でPythonによる一般的なETLタスクを完了します。Unity Catalogを使用していない場合は、Databricksで最初のETLワークロードを実行するを参照してください。
このチュートリアルのタスク
この記事の終わりまでに、以下をスムーズに行えるようになります。
Databricks には、データ プロフェッショナルが抽出、変換、読み込み (ETL) パイプラインを迅速に開発してデプロイできるようにする、運用環境対応のツール スイートが用意されています。Unity Catalogを使用すると、データスチュワードは、組織全体のユーザーのストレージ資格情報、外部ロケーション、およびデータベース オブジェクトを構成してセキュリティで保護できます。Databricks SQL を使用すると、アナリストは運用環境の ETL ワークロードで使用されているのと同じテーブルに対して SQL クエリーを実行できるため、大規模なリアルタイムのビジネスインテリジェンスが可能になります。
Delta Live Tables使用してETLパイプラインを構築することもできます。 DatabricksDelta Live Tables本番運用ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために を作成しました。「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。
必要条件
注
クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のステップのほとんどを完了できます。
ステップ1:クラスターを作成する
探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピューティングリソースを提供するクラスターを作成します。
サイドバー の 「コンピュート 」をクリックします 。
サイドバーで [ 新規 ] をクリックし 、[ クラスター] を選択します。これにより、[新しいクラスター/コンピュート] ページが開きます。
クラスターの一意の名前を指定します。
[単一ノード] ラジオボタンを選択します。
[アクセスモード]ドロップダウンから[シングルユーザー]選択します。
ご自身のメールアドレスが[シングルユーザー]フィールドに表示されていることを確認します。
Unityカタログを使用するには、Databricks ランタイムバージョンの11.1 以降を選択します。
[ コンピュートの作成 ] をクリックして、クラスターを作成します。
Databricks クラスターの詳細については、「 コンピュート」を参照してください。
ステップ2:Databricksノートブックを作成する
ワークスペースにノートブックを作成するには、サイドバーで「新規」をクリックし、 「新聞」をクリックします。 ワークスペースに空白のノートブックが開きます。
ノートブックの作成と管理の詳細については、ノートブックの管理を参照してください。
ステップ3:Unity Catalogによって管理される外部ロケーションからデータの書き込みと読み取りを行う
Databricks では、増分データの取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウドオブジェクトストレージに到着すると、自動的に検出して処理します。
Unity Catalog を使用して、外部ロケーションへの安全なアクセスを管理します。外部ロケーションに対する READ FILES
アクセス許可を持つユーザーまたはサービスプリンシパルは、 Auto Loader を使用してデータを取り込むことができます。
通常、データは他のシステムからの書き込みのために外部ロケーションに到着します。このデモでは、JSON ファイルを外部ロケーションに書き出すことで、データ到着をシミュレートできます。
次のコードをノートブックのセルにコピーします。catalog
の文字列値を、 CREATE CATALOG
および USE CATALOG
アクセス許可を持つカタログの名前に置き換えます。external_location
の文字列値を、 READ FILES
、WRITE FILES
、および CREATE EXTERNAL TABLE
のアクセス許可を持つ外部ロケーションのパスに置き換えます。
外部ロケーションは、ストレージ コンテナー全体として定義できますが、多くの場合、コンテナーに入れ子になったディレクトリを指します。
外部ロケーションパスの正しい形式は "gs://bucket-name/path/to/external_location"
です。
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
このセルを実行すると、12バイトを読み取る行が出力され、文字列「Hello world!」が出力され、提供されたカタログに存在するすべてのデータベースが表示されます。 このセルを実行できない場合は、Unity Catalog が有効なワークスペースを使用していることを確認し、ワークスペース管理者に適切なアクセス許可を要求して、このチュートリアルを完了してください。
以下のPythonコードは、Eメールアドレスを使用して、指定されたカタログに一意のデータベースを作成し、指定された外部ロケーションに一意の保存場所を作成します。 このセルを実行すると、このチュートリアルに関連付けられているすべてのデータが削除され、この例をべき等に実行できるようになります。 クラスが定義され、インスタンス化され、これを使用して、接続されたシステムからソースの外部ロケーションに到着するデータのバッチをシミュレートします。
このコードをノートブックの新しいセルにコピーし、実行して環境を構成します。
注
このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
次のコードをセルにコピーして実行することで、データのバッチをランディングできるようになりました。 このセルを最大 60 回まで手動で実行して、新しいデータの到着をトリガーできます。
RawData.land_batch()
ステップ4:データをUnityカタログに取り込むようにAuto Loaderを設定する
Databricksでは、 Delta Lakeを使用してデータを保存することをお勧めします。Delta Lakeは、ACIDトランザクションを提供し、データレイクハウスを可能にするオープンソースストレージレイヤーです。Delta Lakeは、Databricksで作成されるデフォルトのテーブル形式です。
Unity Catalogテーブルにデータを取り込むようにAuto Loaderを設定するには、次のコードをコピーしてノートブックの空のセルに貼り付けます:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。
構造化ストリーミングを Unity Catalogで使用する方法については、「Unity Catalog を構造化ストリーミングで使用する」を参照してください。
ステップ5:データを処理し、操作する
ノートブックは、セルごとにロジックを実行します。以下のステップを使用して、セル内のロジックを実行します。
前の手順で完了したセルを実行するには、セルを選択して SHIFT+ENTER キーを押します。
作成したテーブルを検索するには、次のコードをコピーして空のセルに貼り付け、Shift+Enter キーを押してセルを実行します。
df = spark.read.table(table)
データフレーム内のデータをプレビューするには、次のコードを空のセルにコピーして貼り付け、Shift + Enter キーを押してセルを実行します。
display(df)
データを視覚化するための対話型オプションの詳細については、「 Databricksノートブックでのビジュアライゼーション」を参照してください。
ステップ6:ジョブをスケジュールする
DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。
ノートブックをタスクとしてスケジュールするには:
ヘッダーバーの右側にある[スケジュール] をクリックします。
ジョブ名に一意の名前を入力します。
[手動] をクリックします。
[クラスター] ドロップダウンで、ステップ1で作成したクラスターを選択します。
[作成]をクリックします。
表示されるウィンドウで、[今すぐ実行] をクリックします。
ジョブ実行の結果を表示するには、[ 最終実行 タイムスタンプ] の横にあるアイコンをクリックします 。
ジョブの詳細については、「Databricksジョブとは」を参照してください。
ステップ 7:Databricks SQLでテーブルを検索する
現在のカタログに対するUSE CATALOG
権限、現在のスキーマに対するUSE SCHEMA
権限、およびテーブルに対するSELECT
権限を持つユーザーは、優先するDatabricks APIからテーブルの内容をクエリーできます。
Databricks SQLでクエリーを実行するには、実行中のSQLウェアハウスにアクセスする必要があります。
このチュートリアルの前半で作成したテーブルの名前は target_table
です。 最初のセルで指定したカタログと、 e2e_lakehouse_<your-username>
のデータベースを使用してクエリーできます。 「カタログエクスプローラ」(Catalog Explorer) を使用して、作成したデータオブジェクトを検索できます。