Delta Live Tables のパイプライン設定を構成する
この記事では、Delta Live Tables のパイプライン設定の構成について詳しく説明します。 Delta Live Tables には、パイプライン設定を構成および編集するためのユーザー インターフェイスがあります。 UI には、JSON で設定を表示および編集するオプションもあります。
注
ほとんどの設定は、UI または JSON 仕様のいずれかを使用して構成できます。 一部の詳細オプションは、JSON 構成を使用してのみ使用できます。
Databricks では、UI を使用して Delta Live Tables の設定をよく理解しておくことをお勧めします。 必要に応じて、ワークスペースで JSON 構成を直接編集できます。 JSON 構成ファイルは、パイプラインを新しい環境にデプロイする場合や、CLI またはREST APIを使用する場合にも役立ちます。
Delta Live Tables JSON 構成設定の完全なリファレンスについては、 「Delta Live Tables パイプライン構成」を参照してください。
製品のエディションを選択する
パイプラインの要件に最適な機能を備えた Delta Live Tables 製品エディションを選択してください。 利用可能な製品エディションは次のとおりです。
Core
ストリーミング取り込みワークロードを実行します。 パイプラインにチェンジデータキャプチャ (CDC) や Delta Live Tables のエクスペクテーションなどの高度な機能が必要ない場合は、Core
エディションを選択します。Pro
ストリーミング取り込みとCDCワークロードを実行します。Pro
製品エディションでは、Core
すべての機能に加えて、ソース データの変更に基づいてテーブルを更新する必要があるワークロードのサポートがサポートされています。Advanced
ストリーミング取り込みワークロード、CDC ワークロード、およびエクスペクテーションを必要とするワークロードを実行します。Advanced
製品エディションは、Core
エディションとPro
エディションの機能をサポートし、Delta Live Tables のエクスペクテーションに対するデータ品質制約の適用もサポートします。
パイプラインを作成または編集するときに、製品エディションを選択できます。 パイプラインごとに異なるエディションを選択できます。 Delta Live Tables 製品ページを参照してください。
注: パイプラインに、エクスペクテーションなど、選択した製品エディションでサポートされていない機能が含まれている場合は、エラーの理由を説明するエラー メッセージが表示されます。 その後、パイプラインを編集して適切なエディションを選択できます。
パイプライン モードを選択する
パイプラインは、継続的に更新することも、パイプライン モードに基づいて手動トリガーを使用して更新することもできます。 「継続的なパイプライン実行 とトリガーによるパイプライン実行」を参照してください。
クラスターポリシーを選択
Delta Live Tables Pipeline を構成および更新するには、ユーザーにはコンピュート をデプロイする権限が必要です。 ワークスペース管理者は、クラスターポリシーを構成して、ユーザーにDelta Live Tablesのコンテンツリソースへのアクセスを提供できます。 Delta Live Tablesパイプライン コンピュートでの制限の定義」を参照してください。
注
クラスターポリシーはオプションです。 Delta Live Tablesに必要なコンピュート権限がない場合は、ワークスペース管理者に確認してください。
クラスターポリシーのデフォルト値が正しく適用されるようにするには、パイプライン設定のクラスター設定で
apply_policy_default_values
値をtrue
に設定します。{ "clusters": [ { "label": "default", "policy_id": "<policy-id>", "apply_policy_default_values": true } ] }
ソースコードライブラリを構成する
Delta Live Tables UI のファイル セレクターを使用して、パイプラインを定義するソース コードを構成できます。 パイプラインのソース コードは、Databricks ノートブック、またはワークスペース ファイルに保存されている SQL または Python スクリプトで定義されます。 パイプラインを作成または編集するときに、1 つ以上のノートブックまたはワークスペース ファイル、あるいはノートブックとワークスペース ファイルの組み合わせを追加できます。
Delta Live Tables はデータセットの依存関係を自動的に分析してパイプラインの処理グラフを構築するため、ソースコードライブラリを任意の順序で追加できます。
また、JSON ファイルを変更して、ワークスペース ファイルに保存されている SQL および Python スクリプトで定義された Delta Live Tables ソース コードを含めることもできます。 次の例には、ノートブックとワークスペース ファイルが含まれています。
{
"name": "Example pipeline 3",
"storage": "dbfs:/pipeline-examples/storage-location/example3",
"libraries": [
{ "notebook": { "path": "/example-notebook_1" } },
{ "notebook": { "path": "/example-notebook_2" } },
{ "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
{ "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
]
}
ストレージロケーションを指定する
Hive metastoreに発行するパイプラインの保存場所を指定できます。場所を指定する主な目的は、パイプラインによって書き込まれるデータのオブジェクトストレージの場所を制御することです。
Delta Live Tables パイプラインのすべてのテーブル、データ、チェックポイント、およびメタデータは Delta Live Tables によってフル管理されるため、Delta Live Tables データセットとのほとんどのやり取りは、 Hive metastore または Unity Catalogに登録されているテーブルを介して行われます。
パイプライン出力テーブルのターゲットスキーマを指定する
オプションではありますが、新しいパイプラインの開発とテストから移行するときはいつでも、パイプラインによって作成されたテーブルを公開するターゲットを指定する必要があります。 パイプラインをターゲットに公開すると、データセットを Databricks 環境内の他の場所でクエリできるようになります。 Delta Live Tablesから にデータを公開する方法、Hive metastore または パイプラインで Unity Catalogを使用する方法Delta Live Tables を参照してください。
コンピュートの設定を構成する
各デルタ ライブ テーブル パイプラインには、次の 2 つのクラスターが関連付けられています。
updates
クラスターは、パイプラインの更新を処理します。maintenance
cluster 実行 日常メンテナンスタスク.
これらのクラスターで使用される設定は、パイプライン設定で指定された clusters
属性によって決まります。
クラスターラベルを使用すると、特定のクラスタータイプにのみ適用されるコンピュート設定を追加できます。 パイプライン クラスターを構成するときに使用できるラベルは 3 つあります。
注
クラスター構成を 1 つだけ定義する場合は、クラスター・ラベル設定を省略できます。 default
ラベルの設定が指定されていない場合、ラベルはクラスター構成に適用されます。クラスターラベル設定は、クラスターの種類ごとに設定をカスタマイズする必要がある場合にのみ必要です。
default
ラベルは、updates
クラスターとmaintenance
クラスターの両方に適用されるコンピュート設定を定義します。 両方の クラスター に同じ設定を適用すると、ストレージの場所のデータ アクセス資格情報などの必要な構成がメンテナンス クラスターに確実に適用されるため、メンテナンス実行 の信頼性が向上します。maintenance
ラベルは、maintenance
クラスターにのみ適用されるコンピュート設定を定義します。 また、maintenance
ラベルを使用して、default
ラベルで構成された設定を上書きすることもできます。updates
ラベルは、updates
クラスターにのみ適用される設定を定義します。updates
ラベルを使用して、maintenance
クラスターに適用しない設定を構成します。
default
ラベルと updates
ラベルを使用して定義された設定がマージされ、 updates
クラスターの最終的な構成が作成されます。default
ラベルと updates
ラベルの両方を使用して同じ設定が定義されている場合、 updates
ラベルで定義された設定は、 default
ラベルで定義された設定をオーバーライドします。
次の例では、 updates
クラスターの構成にのみ追加される Spark 構成パラメーターを定義します。
{
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
},
{
"label": "updates",
"spark_conf": {
"key": "value"
}
}
]
}
Delta Live Tablesには、 Databricksの他のコンピュートと同様のクラスター設定オプションがあります。 他のパイプライン設定と同様に、クラスターの JSON 構成を変更して、UI に存在しないオプションを指定できます。 コンピュートを参照してください。
注
Delta Live Tables ランタイムはパイプライン クラスターのライフサイクルを管理し、 Databricks Runtimeのカスタム バージョンを実行するため、Spark バージョンやクラスター名など、パイプライン構成で一部のクラスター設定を手動で設定することはできません。 ユーザーが設定できないクラスター属性を参照してください。
Delta Live Tables パイプラインは、Photon を利用するように構成できます。 「Photonとは」を参照してください。
パイプラインを実行するインスタンスタイプの選択
デフォルトでは、Delta Live Tables によってパイプラインを実行するドライバー ノードとワーカー ノードのインスタンス タイプが選択されますが、インスタンス タイプを手動で構成することもできます。 たとえば、インスタンスタイプを選択して、パイプラインのパフォーマンスを向上させたり、パイプラインの実行時にメモリの問題に対処したりできます。 インスタンスの種類は、REST API を使用してパイプライン を作成 または 編集 するとき、または Delta Live Tables UI で構成できます。
Delta Live Tables UI でパイプラインを作成または編集するときにインスタンスタイプを設定するには:
[ 設定 ]ボタンをクリックします。
パイプライン設定の[詳細]セクションの[ワーカー タイプ]および[ドライバー タイプ]ドロップダウン メニューで、パイプラインのインスタンス タイプを選択します。
パイプラインの JSON 設定でインスタンス タイプを構成するには、 [JSON]ボタンをクリックし、クラスター構成にインスタンス タイプの構成を入力します。
注
maintenance
クラスターに不要なリソースが割り当てられないようにするために、この例では updates
ラベルを使用して、updates
クラスターのみのインスタンスタイプを設定します。インスタンスタイプを updates
クラスターと maintenance
クラスターの両方に割り当てるには、 default
ラベルを使用するか、ラベルの設定を省略します。 default
ラベルは、ラベルの設定が指定されていない場合、パイプライン クラスター構成に適用されます。「コンピュートの設定」を参照してください。
{
"clusters": [
{
"label": "updates",
"node_type_id": "n1-highmem-16",
"driver_node_type_id": "n1-standard-4",
"..." : "..."
}
]
}
オートスケールを使用して効率を高め、リソース使用量を削減します
拡張オートスケールを使用して、パイプラインのクラスター使用率を最適化します。 拡張オートスケールは、システムがそれらのリソースによってパイプライン処理速度が向上すると判断した場合にのみ、追加のリソースを追加します。 リソースは不要になると解放され、すべてのパイプラインの更新が完了するとすぐにクラスターはシャットダウンされます。
構成の詳細を含む拡張オートスケールの詳細については、「拡張オートスケールを使用してDelta Live Tables Pipeline のクラスター使用率を最適化する」を参照してください。
コンピュートのシャットダウンを遅らせる
Delta Live Tables クラスターは使用されていないときに自動的にシャットダウンするため、クラスター構成で autotermination_minutes
を設定するクラスターポリシーを参照するとエラーが発生します。 クラスターのシャットダウン動作を制御するには、開発モードまたは実稼働モードを使用するか、パイプライン設定で pipelines.clusterShutdown.delay
設定を使用します。 次の例では、 pipelines.clusterShutdown.delay
値を 60 秒に設定します。
{
"configuration": {
"pipelines.clusterShutdown.delay": "60s"
}
}
production
モードが有効になっている場合、 pipelines.clusterShutdown.delay
のデフォルト値は 0 seconds
です。development
モードが有効になっている場合、デフォルト値は 2 hours
です。
シングルノードクラスターを作成する
クラスター設定でnum_workers
を 0 に設定すると、クラスターは単一ノード クラスターとして作成されます。 オートスケール クラスターを構成し、 min_workers
を 0 に設定し、 max_workers
0 に設定すると、単一ノード クラスターも作成されます。
オートスケール クラスターを構成し、 min_workers
のみを 0 に設定すると、クラスターは単一ノード クラスターとして作成されません。 クラスターには、終了するまで常に少なくとも 1 つのアクティブなワーカーがあります。
Delta Live Tablesでシングルノードクラスタを作成するためのクラスタ設定の例:
{
"clusters": [
{
"num_workers": 0
}
]
}
クラスタータグの設定
クラスタータグを使用して、パイプライン クラスター の使用状況を監視できます。 パイプラインを作成または編集するとき、またはパイプライン クラスターの JSON 設定を編集して、Delta Live Tables UI でクラスタータグを追加します。
クラウドストレージ構成
Google Cloud Storage(GCS)のバケットにアクセスするには、そのGCSバケットにアクセスできるサービスアカウントを作成し、そのサービスアカウントをクラスター設定に追加する必要があります。 Google クラウド サービス アカウントの作成の詳細については、「 Google クラウド ストレージに接続する」を参照してください。 サービス アカウント構成は、Delta Live Tables API または Delta Live Tables UI を使用してパイプライン を作成 または 編集 するときに追加できます。
パイプラインの [パイプラインの詳細 ] ページで、[ 設定 ] ボタンをクリックします。 [パイプライン設定] ページが表示されます。
[ JSON] ボタンをクリックします。
クラスター構成の
gcp_attributes.google_service_account
フィールドにサービス アカウント構成を入力します。
{
"clusters": [
{
"gcp_attributes": {
"google_service_account": "test-gcs-doc@databricks-dev.iam.gserviceaccount.com"
}
}
]
}
Python または SQL でデータセット宣言をパラメータ化する
データセットを定義する Python および SQL コードは、パイプラインの設定によってパラメーター化できます。 パラメーター化により、次のユース ケースが可能になります。
長いパスやその他の変数をコードから分離する。
開発環境またはステージング環境で処理されるデータの量を減らして、テストを高速化します。
同じ変換ロジックを再利用して、複数の Data から処理する。
次の例では、 startDate
構成値を使用して、開発パイプラインを入力データのサブセットに制限します。
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
パイプラインのトリガー間隔
pipelines.trigger.interval
を使用して、テーブルまたはパイプライン全体を更新するフローのトリガー間隔を制御できます。トリガーされたパイプラインは各テーブルを 1 回だけ処理するため、 pipelines.trigger.interval
は連続パイプラインでのみ使用されます。
Databricks では、ストリーミングとバッチのクエリのデフォルトが異なるため、個々のテーブルに pipelines.trigger.interval
を設定することをお勧めします。 パイプラインの値を設定するのは、処理でパイプライン グラフ全体の更新を制御する必要がある場合のみです。
テーブルに pipelines.trigger.interval
を設定するには、Python で spark_conf
、または SQL で SET
を使用します。
@dlt.table(
spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
return (<query>)
SET pipelines.trigger.interval=10 seconds;
CREATE OR REFRESH MATERIALIZED VIEW TABLE_NAME
AS SELECT ...
パイプラインに pipelines.trigger.interval
を設定するには、パイプライン設定で configuration
オブジェクトに追加します。
{
"configuration": {
"pipelines.trigger.interval": "10 seconds"
}
}
管理者以外のユーザーが Unity Catalog対応パイプラインからドライバー ログを表示できるようにする
デフォルトでは、パイプライン所有者とワークスペース管理者のみが、Unity Catalog対応パイプラインを実行するクラスターからドライバー ログを表示する権限を持っています。 パイプライン設定の オブジェクトに次の 構成を追加することでCAN MANAGE 、CAN VIEW またはCAN RUN 権限をSparkconfiguration
持つ任意のユーザーのドライバー ログへのアクセスを有効にできます。
{
"configuration": {
"spark.databricks.acl.needAdminPermissionToViewLogs": "false"
}
}
パイプラインイベントのEメール 通知を追加する
次の場合に通知を受信するように 1 つ以上の Eメール アドレスを設定できます。
パイプラインの更新が正常に完了します。
パイプラインの更新は、再試行可能または再試行不可能なエラーで失敗します。 すべてのパイプライン障害の通知を受信するには、このオプションを選択します。
パイプラインの更新は、再試行できない (致命的な) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生したときにのみ通知を受け取ります。
1 つのデータ フローが失敗します。
パイプライン を作成 または編集するときに電子メール通知を構成するには、次のようにします。
[ 通知を追加] をクリックします。
通知を受信する 1 つ以上の Eメール アドレスを入力します。
設定したEメールアドレスに送信する通知の種類ごとにチェックボックスをクリックします。
[ 通知を追加] をクリックします。
SCD タイプ 1 の tombstone 管理の制御
以下の設定を使用して、SCD タイプ 1 処理中の DELETE
イベントの廃棄管理の動作を制御できます。
pipelines.applyChanges.tombstoneGCThresholdInSeconds
: この値は、順不同データ間の予想される最大間隔 (秒単位) と一致するように設定します。デフォルトは 172800 秒 (2 日) です。pipelines.applyChanges.tombstoneGCFrequencyInSeconds
: この設定は、廃棄標識のクリーンアップをチェックする頻度を秒単位で制御します。デフォルトは 1800 秒 (30 分) です。
APPLY CHANGES APIs参照してください: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します。
パイプラインの権限を構成する
パイプラインの権限を管理するには、パイプラインに対するCAN MANAGE
またはIS OWNER
権限が必要です。
サイドバーで、 「Delta Live Tables」をクリックします。
パイプラインの名前を選択します。
ケバブメニュー をクリックし、[ 権限]を選択します。
[権限設定]で、 [ユーザー、グループ、またはサービス プリンシパルの選択]ドロップダウン メニューを選択し、ユーザー、グループ、またはサービス プリンシパルを選択します。
権限のドロップダウンメニューから権限を選択します。
「追加」をクリックします。
「 保存」をクリックします。
Delta Live TablesのRocksDBの状態ストアを有効にする
パイプラインをデプロイする前に次の構成を設定することで、RocksDB ベースの状態管理を有効にできます。
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
RocksDB の構成に関する推奨事項など、RocksDB 状態ストアの詳細については、 「Databricks で RocksDB 状態ストアを構成する」を参照してください。