チュートリアル:最初のDelta Live Tablesパイプラインの実行

このチュートリアルでは、Databricks ノートブックのコードから Delta Live Tables パイプラインを構成し、パイプライン更新をトリガーしてパイプラインを実行する方法を示します。 このチュートリアルには、 PythonおよびSQLインターフェイスを使用してサンプル コードを含むサンプル データセットを取り込んで処理するサンプル パイプラインが含まれています。 このチュートリアルの手順を使用して、適切に定義された Delta Live Tables 構文を持つ任意のノートブックでパイプラインを作成することもできます。

Delta Live Tables パイプラインを構成し、Databricks ワークスペース UI または API、CLI、Databricks アセット バンドルなどの自動ツール オプションを使用して、または Databricks ワークフロー内のタスクとして更新をトリガーできます。 Delta Live Tables の機能と特長を理解するために、Databricks では、まず UI を使用してパイプラインを作成および実行することをお勧めします。 さらに、UI でパイプラインを構成すると、Delta Live Tables はプログラムによるワークフローの実装に使用できるパイプラインの JSON 構成を生成します。

Delta Live Tables機能を実証するために、このチュートリアルの例では、公開されているデータセットをダウンロードします。 ただし、 Databricks 、実際のユースケースを実装する Pipeline で使用されるデータソースに接続してデータを取り込む方法がいくつかあります。 「Delta Live Tables を使用してデータを取り込む」を参照してください。

要件

  • 非サーバーレス パイプラインを開始するには、 クラスター作成権限、またはDelta Live Tablesを定義するクラスターポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムは、パイプラインを実行する前にクラスターを作成しますが、適切な権限がない場合は失敗します。

  • このチュートリアルの例を使用するには、ワークスペースでUnity Catalogが有効になっている必要があります。

  • Unity Catalogで次の権限が必要です。

    • READ VOLUME WRITE VOLUME、または ALL PRIVILEGESは、my-volumeボリュームです。

    • USE SCHEMA またはdefaultスキーマのALL PRIVILEGES

    • USE CATALOG またはmainカタログの場合はALL PRIVILEGES

    これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。

  • このチュートリアルの例では、 Unity Catalogボリュームを使用してサンプル データを保存します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ名、スキーマ名、およびボリューム名を使用して、例で使用するボリュームパスを設定します。

ワークスペースでUnity Catalog 有効になっていない場合は、 必要としないサンプルを含む ノートブックが Unity Catalogこの記事に添付されています。これらの例を使用するには、パイプラインの作成時にストレージ オプションとしてHive metastoreを選択します。

Delta Live Tables クエリはどこで実行しますか?

Delta Live Tablesクエリは主にDatabricksノートブックに実装されていますが、 Delta Live Tablesノートブックのセルで対話的に実行できるように設計されていません。 Databricks ノートブックで Delta Live Tables 構文を含むセルを実行すると、エラー メッセージが表示されます。 クエリを実行するには、ノートブックをパイプラインの一部として構成する必要があります。

重要

  • Delta Live Tablesのクエリを作成する場合、ノートブックのセルごとの実行順序に依存することはできません。 Delta Live Tablesノートブックで定義されたすべてのコードを評価して実行しますが、ノートブック実行 allコマンドとは実行モデルが異なります。

  • 単一の Delta Live Tables ソース コード ファイル内で言語を混在させることはできません。 たとえば、ノートブックには Python クエリまたは SQL クエリのみを含めることができます。 パイプラインで複数の言語を使用する必要がある場合は、パイプラインで複数の言語固有のノートブックまたはファイルを使用します。

ファイルに保存された Python コードを使用することもできます。 たとえば、Python パイプラインにインポートできる Python モジュールを作成したり、SQL クエリで使用する Python ユーザー定義関数 (UDF) を定義したりすることができます。 Python モジュールのインポートの詳細については、 「Git フォルダーまたはワークスペース ファイルから Python モジュールをインポートする」を参照してください。 Python UDF の使用方法については、 「ユーザー定義スカラー関数 - Python」を参照してください。

例: ニューヨークの赤ちゃんの名前データの取り込みと処理

この記事の例では、ニューヨーク州の赤ちゃんの名前の記録を含む、公開されているデータセットを使用しています。 これらの例は、Delta Live Tables パイプラインを使用して次のことを行う方法を示しています。

  • 公開されているデータセットから生の CSV データをテーブルに読み取ります。

  • 生データテーブルからレコードを読み取り、Delta Live Tables Expectationsを使用して、クレンジングされたデータを含む新しいテーブルを作成します。

  • クレンジングされたレコードを、派生データセットを作成するDelta Live Tablesクエリへの入力として使用します。

このコードは、メダリオンアーキテクチャの簡略化された例を示しています。「メダリオンレイクハウスの建築とは?」を参照してください。

この例の実装は、 PythonおよびSQLインターフェイス用に提供されています。 ステップ に従ってサンプル コードを含む新しいパイプラインを作成することも、ステップ 1 に進んで、このページで提供されているパイプラインいずれかを使用することもできます。

Python で Delta Live Tables パイプラインを実装する

PythonDelta Live Tablesセットを作成する コードは、DataFrames PySparkまたは for の経験があるユーザーには馴染みのあるPandas Spark返す必要があります。DataFrames に慣れていないユーザーの場合、Databricks では SQL インターフェイスの使用をお勧めします。 を使用したDelta Live Tables パイプラインの実装をSQL 参照してください。

すべてのDelta Live Tables Python APIs dlt モジュールに実装されています。 Python で実装された Delta Live Tables パイプライン コードは、Python ノートブックおよびファイルの先頭にあるdltモジュールを明示的にインポートする必要があります。 Delta Live Tables 、多くのPythonスクリプトとは重要な点で異なります。Delta Delta Live Tablesデータセットを作成するために、データ取り込みと変換を実行する関数を呼び出す必要はありません。 代わりに、Delta Live Tables は、パイプラインにロードされたすべてのファイル内のdltモジュールからのデコレーター関数を解釈し、データフロー グラフを構築します。

このチュートリアルの例を実装するには、次の Python コードをコピーして、新しい Python ノートブックに貼り付けます。 各サンプル コード スニペットを、説明されている順序でノートブック内の独自のセルに追加する必要があります。 ノートブックを作成するためのオプションを確認するには、 「ノートブックを作成する」を参照してください。

Python インターフェイスを使用してパイプラインを作成する場合、デフォルトでは、テーブル名は関数名によって定義されます。 たとえば、次の Python の例では、 baby_names_rawbaby_names_preparedtop_baby_names_2021という名前の 3 つのテーブルが作成されます。 nameを使用してテーブル名をオーバーライドできます。 「Delta Live Tables マテリアライズド ビューまたはストリーミング テーブルの作成」を参照してください。

Delta Live Tablesモジュールをインポートする

すべてのDelta Live Tables Python APIは、dlt モジュールに実装されています。Pythonノートブックやファイルの先頭でdltモジュールを明示的にインポートする。

次の例は、このインポートと、pyspark.sql.functionsのインポートステートメントを示しています。

import dlt
from pyspark.sql.functions import *

データのダウンロード

この例のデータを取得するには、次のように CSV ファイルをダウンロードしてボリュームに保存します。

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

<catalog-name><schema-name>、および <volume-name> を、 Unity Catalogボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。

オブジェクトストレージ内のファイルからのテーブルの作成

Delta Live Tables は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。

@dlt.tableデコレータは、関数によって返されたDataFrameの結果を含むテーブルを作成するように Delta Live Tables に指示します。 @dlt.tablePythonSparkDataFrameに新しいテーブルを登録するための を返す 関数定義の前に、Delta Live Tables デコレータを追加します。次の例は、関数名をテーブル名として使用し、テーブルに説明コメントを追加する方法を示しています。

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

パイプラインの上流のデータセットからテーブルを追加する

dlt.read()を使用して、現在のDelta Live Tablesパイプラインで宣言されている他のデータセットからデータを読み取ることができます。この方法で新しいテーブルを宣言すると、更新を実行する前にDelta Live Tablesによって自動的に解決される依存関係が作成されます。次のコードには、期待に基づいてデータ品質を監視および強制する例も含まれています。「Delta Live Tablesによるデータ品質の管理」を参照してください。

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

強化されたデータビューを含むテーブルを作成する

Delta Live Tablesはパイプラインの更新を一連の依存関係グラフとして処理するので、特定のビジネスロジックを含むテーブルを宣言することで、ダッシュボード、BI、分析を強化する高度に強化されたビューを宣言できます。

Delta Live Tablesのテーブルは、概念的にはマテリアライズド ビューと同等です。 Spark の従来のビューは、ビューがクエリされるたびにロジックを実行しますが、Delta Live Tables テーブルはクエリ結果の最新バージョンをデータ ファイルに保存します。 Delta Live Tables はパイプライン内のすべてのデータセットの更新を管理するため、マテリアライズド ビューのレイテンシ要件に合わせてパイプラインの更新をスケジュールし、これらのテーブルに対するクエリに利用可能な最新バージョンのデータが含まれていることを確認できます。

以下のコードで定義されたテーブルは、パイプラインの上流データから派生したマテリアライズドビューとの概念的な類似性を示しています:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

ノートブックを使用するパイプラインを構成するには、 「パイプラインの作成」を参照してください。

SQL を使用して Delta Live Tables パイプラインを実装する

DatabricksDelta Live Tablesでは、SQL SQLユーザーが 上に新しいETL 、取り込み、および変換パイプラインを構築するための推奨方法として、 を使用したDatabricks を推奨しています。Delta Live Tables の SQL インターフェイスは、多くの新しいキーワード、構造、およびテーブル値関数を使用して標準の Spark SQL を拡張します。 標準SQLへのこれらの追加により、ユーザーは新しいツールや追加の概念を学習することなく、データセット間の依存関係を宣言し、本番運用グレードのインフラストラクチャをデプロイできるようになります。

Spark DataFrames に精通していて、メタプログラミング操作など、SQL での実装が難しいより広範なテストや操作のサポートが必要なユーザーの場合、Databricks では Python インターフェイスの使用をお勧めします。 「例: ニューヨークの赤ちゃんの名前データの取り込みと処理」を参照してください。

データのダウンロード

この例のデータを取得するには、次のコードをコピーし、新しいノートブックに貼り付けて、ノートブックを実行します。 ノートブックを作成するためのオプションを確認するには、 「ノートブックを作成する」を参照してください。

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

<catalog-name><schema-name>、および <volume-name> を、 Unity Catalogボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。

Unity Catalog内のファイルからテーブルを作成する

この例の残りの部分では、次の SQL スニペットをコピーし、前のセクションのノートブックとは別の新しい SQL ノートブックに貼り付けます。 各サンプル SQL スニペットを、説明されている順序でノートブック内の独自のセルに追加する必要があります。

Delta Live Tables は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。

Delta Live TablesのすべてのSQLステートメントは、CREATE OR REFRESH構文とセマンティクスを使用します。パイプラインを更新すると、Delta Live Tablesは、増分処理によってテーブルの論理的に正しい結果を達成できるか、それとも完全な再計算が必要かどうかを判断します。

次の例では、 ボリュームに保存されている ファイルからデータをロードしてテーブルを作成します。CSVUnity Catalog

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

<catalog-name><schema-name>、および <volume-name> を、 Unity Catalogボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。

上流のデータセットからパイプラインにテーブルを追加する

live仮想スキーマを使用して、現在の Delta Live Tables パイプラインで宣言されている他のデータセットのデータをクエリできます。 この方法で新しいテーブルを宣言すると、更新を実行する前にDelta Live Tables自動的に解決される依存関係が作成されます。 live スキーマは、データセットを公開する場合にターゲット スキーマの代わりに使用できる、 Delta Live Tablesに実装されたカスタム キーワードです。 でUnity Catalog Delta Live Tablesを使用する」 および パイプラインから にデータを公開する」をDelta Live TablesHive metastore 参照してください。

次のコードには、期待に基づいてデータ品質を監視および強制する例も含まれています。Delta Live Tablesによるデータ品質の管理を参照してください

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

充実したデータビューを作成する

Delta Live Tablesはパイプラインの更新を一連の依存関係グラフとして処理するので、特定のビジネスロジックを含むテーブルを宣言することで、ダッシュボード、BI、分析を強化する高度に強化されたビューを宣言できます。

ライブテーブルは、概念的には具体化されたビューと同等です。 Spark の従来のビューは、ビューがクエリされるたびにロジックを実行しますが、ライブ テーブルはクエリ結果の最新バージョンをデータ ファイルに保存します。 Delta Live Tables はパイプライン内のすべてのデータセットの更新を管理するため、マテリアライズド ビューのレイテンシ要件に合わせてパイプラインの更新をスケジュールし、これらのテーブルに対するクエリに利用可能な最新バージョンのデータが含まれていることを確認できます。

次のコードは、アップストリームデータの強化されたマテリアライズドビューを作成します:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

ノートブックを使用するパイプラインを構成するには、 「パイプラインの作成」に進みます。

パイプラインを作成する

Delta Live Tables構文を使用してノートブックまたはファイル ( ソース コード または ライブラリ と呼ばれる)Delta Live Tables で定義された依存関係を解決することによってパイプラインを作成します。各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプライン内で異なる言語のライブラリを混在させることができます。

  1. サイドバーの「Delta Live Tables」をクリックし、 「Create Pipeline」をクリックします。

  2. パイプラインに名前を付けます。

  3. (オプション)製品のエディションを選択します。

  4. [パイプラインモード][トリガー]を選択します。

  5. パイプラインのソース コードを含む 1 つ以上のノートブックを構成します。 [パス]テキストボックスにノートブックへのパスを入力するか、ファイルピッカーアイコンを押してノートブックを選択します。

  6. パイプラインによって公開されるデータセットの宛先 ( Hive metastoreまたはUnity Catalogを選択します。 データセットの公開 を参照してください。

    • Hive metastore :

      • (オプション)パイプラインからの出力データの保存場所を入力します。ストレージの場所を空のままにすると、システムはデフォルトの場所を使用します。

      • (オプション) データセットを に公開するための ターゲット スキーマHive metastore を指定します。

    • Unity Catalog: でデータセットを公開する カタログ ターゲット スキーマUnity Catalog を指定します。

  7. (オプション) パイプラインのコンピュート設定を構成します。 コンピュート設定のオプションについては、 Delta Live Tablesのパイプライン設定を構成する」を参照してください。

  8. (オプション)[ 通知の追加 ] をクリックして、パイプライン イベントの通知を受信する 1 つ以上の電子メール アドレスを構成します。 「 パイプライン イベントの電子メール通知を追加する」を参照してください。

  9. (オプション) パイプラインの詳細設定を構成します。 詳細設定のオプションについては、 Delta Live Tablesのパイプライン設定を構成する」を参照してください。

  10. [作成]をクリックします。

[作成]をクリックすると、「パイプラインの詳細」ページが表示されます。[Delta Live Tables]タブでパイプライン名をクリックして、パイプラインにアクセスすることもできます。

パイプラインの更新を開始する

パイプラインの更新を開始するには、 Delta Live Tables開始アイコン 上部パネルのボタンをクリックします。 パイプラインが開始されていることを確認するメッセージが返されます。

更新が正常に開始された後、Delta Live Tablesシステムは次のようになります:

  1. Delta Live Tables システムによって作成されたクラスター構成を使用してクラスターを開始します。カスタム クラスター構成を指定することもできます。

  2. 存在しないテーブルを作成し、既存のテーブルのスキーマが正しいことを確認します。

  3. 使用可能な最新のデータでテーブルを更新します。

  4. アップデートが完了したらクラスターをシャットダウンします。

実行モードはデフォルトで [ 本番 ] に設定され、更新ごとに一時的なコンピュート リソースがデプロイされます。 開発モードを使用してこの動作を変更し、 開発 およびテスト中に同じコンピュート リソースを複数のパイプライン更新に使用できます。 開発モードと本番モードを参照してください。

データセットを公開する

テーブルをHive metastoreまたはUnity Catalogに公開することで、Delta Live Tables データセットをクエリに使用できるようにできます。 データをパブリッシュするターゲットを指定しない場合、 Delta Live Tablesパイプラインで作成されたテーブルには、同じパイプライン内の他の操作によってのみアクセスできます。 「Delta Live Tables パイプラインからHive metastoreへのデータのパブリッシュ」および「Delta Live Tables パイプラインでのUnity Catalogの使用」を参照してください。

ソースコードのノートブックの例

これらのノートブックを Databricks ワークスペースにインポートし、それを使用して Delta Live Tables パイプラインをデプロイできます。 「パイプラインの作成」を参照してください。

Delta Live Tables Pythonノートブックを使い始める

ノートブックを新しいタブで開く

Delta Live Tables SQLノートブックを使ってみる

ノートブックを新しいタブで開く

Unity Catalogなしのワークスペースのソースコード例

有効にしなくても、これらのノートブックをDatabricks ワークスペースにインポートし、Unity Catalog Delta Live Tablesパイプラインのデプロイに使用できます。「パイプラインの作成」を参照してください。

Delta Live Tables Pythonノートブックを使い始める

ノートブックを新しいタブで開く

Delta Live Tables SQLノートブックを使ってみる

ノートブックを新しいタブで開く