チュートリアル:エンドツーエンドのレイクハウス分析パイプラインを実行する

このチュートリアルでは、Databricksレイクハウスのエンドツーエンド分析パイプラインを設定する方法を説明します。

重要

このチュートリアルでは、対話型ノートブックを使用して、Unityカタログが有効なクラスター上でPythonによる一般的なETLタスクを完了します。Unity Catalogを使用していない場合は、Databricksで最初のETLワークロードを実行するを参照してください。

このチュートリアルのタスク

この記事の終わりまでに、以下をスムーズに行えるようになります。

  1. Unity Catalog 対応のコンピュートクラスターを起動

  2. Databricks ノートブックの作成

  3. Unity Catalog 外部からのデータの書き込みと読み取り

  4. Auto Loader を使用した Unity Catalogテーブルへの増分データ取り込みの構成

  5. ノートブックのセルを実行して、データを処理、クエリー、およびプレビュー

  6. ノートブックを Databricks ジョブとしてスケジュール

  7. Databricks SQL からのUnity Catalog テーブルのクエリ

Databricks には、データ プロフェッショナルが抽出、変換、読み込み (ETL) パイプラインを迅速に開発してデプロイできるようにする、運用環境対応のツール スイートが用意されています。Unity カタログを使用すると、データスチュワードは、組織全体のユーザーのストレージ資格情報、外部ロケーション、およびデータベース オブジェクトを構成してセキュリティで保護できます。Databricks SQL を使用すると、アナリストは運用環境の ETL ワークロードで使用されているのと同じテーブルに対して SQL クエリーを実行できるため、大規模なリアルタイムのビジネスインテリジェンスが可能になります。

Delta Live Tables使用してETLパイプラインを構築することもできます。 DatabricksDelta Live Tables本番運用ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために を作成しました。「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。

必要条件

クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のステップのほとんどを完了できます。

ステップ1:クラスターを作成する

探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピューティングリソースを提供するクラスターを作成します。

  1. コンピュートアイコン サイドバー の 「コンピュート 」をクリックします

  2. サイドバーで [新しいアイコン 新規 ] をクリックし 、[ クラスター] を選択します。これにより、[新しいクラスター/コンピュート] ページが開きます。

  3. クラスターの一意の名前を指定します。

  4. [単一ノード] ラジオボタンを選択します。

  5. アクセスモード]ドロップダウンから[シングルユーザー]選択します。

  6. ご自身のメールアドレスが[シングルユーザー]フィールドに表示されていることを確認します。

  7. Unityカタログを使用するには、Databricks ランタイムバージョンの11.1 以降を選択します。

  8. [ コンピュートの作成 ] をクリックして、クラスターを作成します。

Databricks クラスターの詳細については、「 コンピュート」を参照してください。

ステップ2:Databricksノートブックを作成する

Databricksで対話型コードの作成と実行を開始するには、ノートブックを作成します。

  1. 新しいアイコン サイドバーで 「 新規 」をクリック し 、「 ノートブック」 をクリックします。

  2. [ノートブックの作成] ページで、次の操作を行います。

    • ノートブックの一意の名前を指定します。

    • デフォルトの言語がPythonに設定されていることを確認してください。

    • [ 接続 ] ドロップダウンメニューを使用して、ステップ 1 で作成したクラスターを [ クラスター ] ドロップダウンから選択します。

ノートブックが開き、空のセルが 1 つ表示されます。

ノートブックの作成と管理の詳細については、ノートブックの管理を参照してください。

手順 3: Unity Catalog によって管理される外部ロケーションからデータの書き込みと読み取りを行う

Databricks では、増分データの取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。

Unity Catalog を使用して、外部ロケーションへの安全なアクセスを管理します。外部ロケーションに対する READ FILES アクセス許可を持つユーザーまたはサービスプリンシパルは、 Auto Loader を使用してデータを取り込むことができます。

通常、データは他のシステムからの書き込みのために外部ロケーションに到着します。このデモでは、JSON ファイルを外部ロケーションに書き出すことで、データ到着をシミュレートできます。

次のコードをノートブックのセルにコピーします。catalog の文字列値を、 CREATE CATALOG および USE CATALOG アクセス許可を持つカタログの名前に置き換えます。external_location の文字列値を、 READ FILESWRITE FILES、および CREATE EXTERNAL TABLE のアクセス許可を持つ外部ロケーションのパスに置き換えます。

外部ロケーションは、ストレージ コンテナー全体として定義できますが、多くの場合、コンテナーに入れ子になったディレクトリを指します。

外部ロケーションパスの正しい形式は "gs://bucket-name/path/to/external_location"です。


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

このセルを実行すると、12バイトを読み取る行が出力され、文字列「Hello world!」が出力され、提供されたカタログに存在するすべてのデータベースが表示されます。 このセルを実行できない場合は、Unity Catalog が有効なワークスペースを使用していることを確認し、ワークスペース管理者に適切なアクセス許可を要求して、このチュートリアルを完了してください。

以下のPythonコードは、Eメールアドレスを使用して、指定されたカタログに一意のデータベースを作成し、指定された外部ロケーションに一意の保存場所を作成します。 このセルを実行すると、このチュートリアルに関連付けられているすべてのデータが削除され、この例をべき等に実行できるようになります。 クラスが定義され、インスタンス化され、これを使用して、接続されたシステムからソースの外部ロケーションに到着するデータのバッチをシミュレートします。

このコードをノートブックの新しいセルにコピーし、実行して環境を構成します。

このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)


# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

次のコードをセルにコピーして実行することで、データのバッチをランディングできるようになりました。 このセルを最大 60 回まで手動で実行して、新しいデータの到着をトリガーできます。

RawData.land_batch()

ステップ4:データをUnityカタログに取り込むようにAuto Loaderを設定する

Databricksでは、 Delta Lakeを使用してデータを保存することをお勧めします。Delta Lakeは、ACIDトランザクションを提供し、データレイクハウスを可能にするオープンソースストレージレイヤーです。Delta Lakeは、Databricksで作成されるデフォルトのテーブル形式です。

Unity Catalogテーブルにデータを取り込むようにAuto Loaderを設定するには、次のコードをコピーしてノートブックの空のセルに貼り付けます:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Auto Loaderの詳細については、 Auto Loaderとはを参照してください。

構造化ストリーミングを Unity Catalogで使用する方法については、「Unity Catalog を構造化ストリーミングで使用する」を参照してください。

ステップ5:データを処理し、操作する

ノートブックは、セルごとにロジックを実行します。以下のステップを使用して、セル内のロジックを実行します。

  1. 前の手順で完了したセルを実行するには、セルを選択して SHIFT+ENTER キーを押します。

  2. 作成したテーブルを検索するには、次のコードをコピーして空のセルに貼り付け、Shift+Enter キーを押してセルを実行します。

    df = spark.read.table(table_name)
    
  3. データフレーム内のデータをプレビューするには、次のコードを空のセルにコピーして貼り付け、Shift + Enter キーを押してセルを実行します。

    display(df)
    

データを視覚化するための対話型オプションの詳細については、「 Databricksノートブックでのビジュアライゼーション」を参照してください。

ステップ6:ジョブをスケジュールする

DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには:

  1. ヘッダーバーの右側にある[スケジュール] をクリックします。

  2. ジョブ名に一意の名前を入力します。

  3. [手動] をクリックします。

  4. [クラスター] ドロップダウンで、ステップ1で作成したクラスターを選択します。

  5. [作成]をクリックします。

  6. 表示されるウィンドウで、[今すぐ実行] をクリックします。

  7. ジョブ実行の結果を表示するには、[外部リンク 最終実行 タイムスタンプ] の横にあるアイコンをクリックします 。

ジョブの詳細については、「 Databricks ジョブとは」を参照してください。

ステップ 7: Databricks SQLでテーブルを検索する

現在のカタログに対するUSE CATALOG権限、現在のスキーマに対するUSE SCHEMA権限、およびテーブルに対するSELECT権限を持つユーザーは、優先するDatabricks APIからテーブルの内容をクエリーできます。

Databricks SQLでクエリーを実行するには、実行中のSQLウェアハウスにアクセスする必要があります。

このチュートリアルの前半で作成したテーブルの名前は target_tableです。 最初のセルで指定したカタログと、 e2e_lakehouse_<your-username>のデータベースを使用してクエリーできます。 「カタログエクスプローラ」(Catalog Explorer) を使用して、作成したデータオブジェクトを検索できます。

その他の統合

Databricksを使用したデータエンジニアリングのための統合とツールの詳細をご覧ください。