Delta Live Tables パイプラインで更新を実行する

この記事では、Delta Live Tables パイプラインの更新の概要と、その実行方法について説明します。

パイプラインを作成して実行する準備ができたら、 更新を開始します。 パイプラインの更新では、次の処理が行われます。

  • 正しい構成でクラスターを起動します。

  • 定義されているすべてのテーブルとビューを検出し、無効な列名、依存関係の欠落、構文エラーなどの分析エラーをチェックします。

  • 使用可能な最新のデータでテーブルとビューを作成または更新します。

パイプラインのソース コードの問題は、 検証更新を使用してテーブルが作成または更新されるのを待たずにチェックできます。 Validate 機能は、パイプラインの開発またはテスト時に、テーブル名や列名が正しくないなど、パイプライン内のエラーをすばやく見つけて修正できるため便利です。

パイプラインの作成方法については、 「チュートリアル: 初めての Delta Live Tables パイプラインを実行する」を参照してください。

パイプラインの更新 を開始する

Databricks には、パイプラインの更新を開始するための次のようないくつかのオプションが用意されています。

  • Delta Live Tables UI には、次のオプションがあります。

    • パイプラインの詳細ページのDelta Live Tables スタートアイコン ボタンをクリックします。

    • パイプラインの一覧から、[右矢印アイコン アクション ] 列をクリックします 。

  • ノートブックで更新を開始するには、ノートブック ツール バーの [開始Delta Live Tables >] をクリックします。「 ノートブックから Delta Live Tables パイプラインを開く、または実行する」を参照してください。

  • API または CLI を使用してプログラムでパイプラインをトリガーできます。 API ガイドDelta Live Tables を参照してください。

  • パイプラインは、 Delta Live Tables UI またはジョブ UI を使用してジョブとしてスケジュールできます。 「 パイプラインのスケジュール」を参照してください。

Delta Live Tables がテーブルとビュー を更新する方法

更新されるテーブルとビュー、およびそれらのテーブルのビューの更新方法は、更新の種類によって異なります。

  • すべて更新: すべてのライブ テーブルが更新され、入力 DATA の現在の状態が反映されます。 すべてのストリーミング テーブルで、新しい行がテーブルに追加されます。

  • すべて完全更新: すべてのライブ テーブルが更新され、入力 DATA の現在の状態が反映されます。 すべてのストリーミング テーブルについて、Delta Live Tables は各テーブルからすべてのデータをクリアしてから、ストリーミング ソースからすべてのデータをロードしようとします。

  • 選択範囲の更新: refresh selection の動作は refresh allと同じですが、選択したテーブルのみを更新できます。 選択したライブテーブルが更新され、入力 DATA の現在の状態が反映されます。 選択したストリーミングテーブルでは、新しい行がテーブルに追加されます。

  • 完全更新の選択: full refresh selection の動作は full refresh allと同じですが、選択した表のみの完全更新を実行できます。 選択したライブテーブルが更新され、入力 DATA の現在の状態が反映されます。 選択したストリーミングテーブルについて、Delta Live Tables は各テーブルからすべてのデータをクリアしてから、ストリーミングソースからすべてのデータをロードしようとします。

既存のライブテーブルの場合、更新の動作はマテリアライズドビューの SQL REFRESH と同じです。 新しいライブテーブルの場合、動作は SQL CREATE 操作と同じです。

選択したテーブルの パイプライン更新を開始する

パイプライン内の選択したテーブルのみのデータを再処理できます。 たとえば、開発中に 1 つのテーブルのみを変更してテスト時間を短縮したり、パイプラインの更新が失敗して失敗した テーブルのみを更新したりします。

トリガーされたパイプラインでのみ選択的更新を使用できます。

選択したテーブルのみを更新する更新を開始するには、 [パイプラインの詳細 ] ページで:

  1. [ 更新するテーブルの選択] をクリックします。 [ 更新するテーブルの選択 ] ダイアログが表示されます。

    [ 更新するテーブルの選択 ] ボタンが表示されない場合は、[ パイプラインの詳細 ] ページに最新の更新が表示され、更新が完了していることを確認してください。 更新に失敗したなどの理由で、最新の更新プログラムの DAG が表示されない場合、[ 更新するテーブルの選択 ] ボタンは表示されません。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新からテーブルを削除するには、テーブルをもう一度クリックします。

  3. [ 選択範囲の更新] をクリックします。

    [ 選択範囲の更新] ボタンには、選択したテーブルの数が括弧内に表示されます。

選択したテーブルに既に取り込まれているデータを再処理するには、[ブルーダウンキャレット 選択範囲の更新] ボタンの横をクリックし 、[選択範囲の完全更新 ] をクリックします。

失敗したテーブルの パイプライン更新を開始する

パイプライン グラフ内の 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみの更新を開始できます。

除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。

失敗したテーブルを更新するには、「 パイプラインの詳細 」ページで、「 失敗したテーブルの更新」をクリックします。

選択した失敗したテーブルのみを更新するには:

  1. ボタンダウン [ 失敗したテーブルの更新 ] ボタンの横をクリックし 、[ 更新するテーブルの選択 ] をクリックします。[ 更新するテーブルの選択 ] ダイアログが表示されます。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新からテーブルを削除するには、テーブルをもう一度クリックします。

  3. [ 選択範囲の更新] をクリックします。

    [ 選択範囲の更新] ボタンには、選択したテーブルの数が括弧内に表示されます。

選択したテーブルに既に取り込まれているデータを再処理するには、[ブルーダウンキャレット 選択範囲の更新] ボタンの横をクリックし 、[選択範囲の完全更新 ] をクリックします。

テーブルが更新されるのを待たずにパイプラインでエラーをチェックする

プレビュー

Delta Live Tables Validate の更新機能は パブリック プレビュー段階です。

完全な更新を実行せずにパイプラインのソース コードが有効かどうかを確認するには、 [検証] を使用します。 Validate更新では、パイプラインで定義されているデータセットとフローの定義が解決されますが、データセットの具体化や発行は行われません。検証中に見つかったエラー (テーブル名や列名が正しくないなど) は、UI で報告されます。

Validate更新を実行するには、パイプラインの詳細ページで [ブルーダウンキャレット 開始 ] の横にある をクリックし 、[ 検証] をクリックします。

Validateの更新が完了すると、イベント ログにはValidateの更新に関連するイベントのみが表示され、DAG にはメトリクスは表示されません。エラーが見つかった場合は、イベント ログに詳細が表示されます。

最新の Validate 更新の結果のみが表示されます。 Validate更新が最後に実行された更新であった場合は、更新履歴でそれを選択して結果を確認できます。Validate更新後に別の更新が実行されると、結果は UI で使用できなくなります。

継続的なパイプライン実行とトリガーされたパイプライン実行

パイプラインで トリガー実行 モードが使用されている場合、システムはパイプライン内のすべてのテーブルまたは選択したテーブルを一度正常に更新した後に処理を停止し、更新の開始時に使用可能なデータに基づいて更新の一部である各テーブルが確実に更新されます。

パイプラインで 連続 実行が使用されている場合、Delta Live Tables は、パイプライン全体のテーブルを最新の状態に保ちます。

実行モードは、コンピュートであるテーブルの種類とは無関係です。 マテリアライズドビューとストリーミングテーブルはどちらも、どちらの実行モードでも更新できます。 連続実行モードでの不要な処理を回避するために、パイプラインは依存 Delta テーブルを自動的に監視し、それらの依存テーブルの内容が変更された場合にのみ更新を実行します。

Delta Live Tables ランタイムは、デルタ以外のデータ ソースの変更を検出できません。テーブルは引き続き定期的に更新されますが、過度の再計算によってクラスターで発生する増分処理の速度が低下するのを防ぐために、デフォルトのトリガー間隔が長くなります。

データパイプラインの実行モード を比較する表

次の表に、これらの実行モードの違いを示します。

トリガー

連続

更新はいつ停止しますか?

完了すると自動的に。

手動で停止するまで継続的に実行されます。

どのようなデータが処理されますか?

更新の開始時に使用可能なデータ。

設定されたソースに到着したすべてのデータ。

これはどのようなデータ鮮度要件に最適ですか?

データの更新は、10 分ごと、毎時、または毎日実行されます。

10秒ごとから数分の間に必要なデータ更新。

トリガーされたパイプラインは、クラスターがパイプラインを実行するのに十分な時間しか実行されないため、リソースの消費と費用を削減できます。 ただし、パイプラインがトリガーされるまで、新しいデータは処理されません。 継続的なパイプラインには常時稼働中のクラスターが必要であり、コストは高くなりますが、処理の待機時間は短くなります。

設定の [パイプライン モード ] オプションを使用して実行モードを構成できます。

パイプライン境界 の選び方

Delta Live Tablesパイプラインは、単一のテーブル、依存関係のある多数のテーブル、関係のない多数のテーブル、または依存関係のあるテーブルの複数の独立したフローに対する更新を処理できます。 このセクションには、パイプラインを分割する方法を決定するのに役立つ考慮事項が含まれています。

大規模な Delta Live Tables パイプラインには多くの利点があります。 これらには以下が含まれます。

  • クラスター リソースをより効率的に使用します。

  • ワークスペース内のパイプラインの数を減らします。

  • ワークフローオーケストレーションの複雑さを軽減します。

処理パイプラインの分割方法に関する一般的な推奨事項には、次のようなものがあります。

  • チーム境界で機能を分割します。 たとえば、データ チームがデータを変換するためのパイプラインを維持し、データアナリストが変換されたデータを分析するパイプラインを維持する場合があります。

  • アプリケーション固有の境界で機能を分割して、結合を減らし、共通の機能の再利用を容易にします。

開発モードと本番モード

パイプラインの実行を最適化するには、開発モードと本番モードを切り替えます。 Delta Live Tables 環境切り替えアイコン パイプライン UI のボタンを使用して、これら 2 つのモードを切り替えます。デフォルトにより、パイプラインは開発モードで実行されます。

パイプラインを開発モードで実行すると、 Delta Live Tables システムは次の処理を行います。

  • クラスターを再利用して、再起動のオーバーヘッドを回避します。 デフォルトでは、開発モードが有効になっている場合、クラスターは 2 時間実行されます。 これは、[コンピュートの設定の構成] の [pipelines.clusterShutdown.delay] 設定で変更できます。

  • パイプラインの再試行を無効にして、エラーをすぐに検出して修正できるようにします。

本番モードでは、 Delta Live Tables システムは以下を実行します。

  • メモリリークや古い認証情報など、特定の回復可能なエラーに対してクラスターを再起動します。

  • クラスターの起動に失敗した場合など、特定のエラーが発生した場合に実行を再試行します。

開発モードと本番モードの切り替えは、クラスターとパイプラインの実行動作のみを制御します。 テーブルを発行するためのカタログ内のストレージの場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。

パイプライン をスケジュールする

トリガーされたパイプラインを手動で開始することも、Databricks ジョブを使用してスケジュールに従ってパイプラインを実行することもできます。 Delta Live Tables UI で 1 つのパイプライン タスクを含むジョブを直接作成してスケジュールすることも、ジョブ UI でマルチタスク ワークフローにパイプライン タスクを追加することもできます。

Delta Live Tables UI で単一タスクのジョブとそのジョブのスケジュールを作成するには:

  1. [ スケジュール] > [スケジュールの追加] をクリックします。 [ スケジュール] ボタンが更新され、パイプラインが 1 つ以上のスケジュールされたジョブに含まれている場合 (スケジュール (5) など) 既存のスケジュールの数が表示されます。

  2. [ ジョブ名 ] フィールドにジョブの名前を入力します。

  3. [スケジュール] を [スケジュール済み] に設定します。

  4. 期間、開始時刻、およびタイム ゾーンを指定します。

  5. パイプラインの開始、成功、または失敗に関するアラートを受信するように 1 つ以上の Eメール アドレスを構成します。

  6. [作成]をクリックします。