Delta Live Tablesパイプライン で Unity Catalog を使用する

プレビュー

Unity Catalog Delta Live Tables のサポートは パブリック プレビュー 段階です。

Hive metastore にテーブルを永続化するための既存のサポートに加えて、DeltaLive Tables パイプラインで Unity Catalog を使用して次のことができます。

  • パイプラインがテーブルを保持する Unity Catalog カタログを定義します。

  • Unity Catalog テーブルからデータを読み取ります。

ワークスペースには、 Unity Catalog または Hive metastoreを使用するパイプラインを含めることができます。 ただし、1 つのパイプラインが Hive metastore と Unity Catalog の両方に書き込むことはできず、既存のパイプラインを Unity Catalogを使用するようにアップグレードすることはできません。 Unity Catalog を使用しない既存のパイプラインは、このプレビューの影響を受けず、構成されたストレージの場所を使用して Hive metastore にデータを引き続き保持します。

このドキュメントで特に指定されていない限り、既存のデータ ソースと Delta Live Tables 機能はすべて、 Unity Catalogを使用するパイプラインでサポートされます。 Python インターフェイスと SQL インターフェイスはどちらも、 Unity Catalogを使用するパイプラインでサポートされています。

The tables created in your pipeline can also be queried from shared Unity Catalog clusters using Databricks Runtime 13.3 LTS and above or a SQL warehouse. Tables cannot be queried from assigned or no isolation clusters.

Unity Catalog パイプラインによって作成されたテーブルに対するアクセス許可を管理するには、 GRANT と REVOKE を使用します。

要件

Delta Live Tables パイプラインからUnity Catalog にテーブルを作成するには、以下が必要です。

  • ターゲット・カタログに対する USE CATALOG 特権が必要です。

  • パイプラインで 具体化されたビューを作成する場合は、ターゲットスキーマで CREATE MATERIALIZED VIEW 権限と USE SCHEMA 権限が必要です。

  • パイプラインで ストリーミングテーブルを作成する場合は、ターゲットスキーマで CREATE TABLE 権限と USE SCHEMA 権限が必要です。

  • パイプライン設定でターゲットスキーマが指定されていない場合は、ターゲットカタログ内の少なくとも 1 つのスキーマに対する CREATE MATERIALIZED VIEW 権限または CREATE TABLE 権限が必要です。

制限

Delta Live Tables でUnity Catalog を使用する場合の制限事項を次に示します。

  • 既定では、パイプラインの所有者とワークスペース管理者のみが、Unity Catalog 対応パイプラインを実行するクラスターのドライバー ログを表示するアクセス許可を持っています。 他のユーザーがドライバー ログを表示できるようにするには、「 管理者以外のユーザーが Unity Catalog 対応パイプラインからドライバー ログを表示できるようにする」を参照してください。

  • Hive metastore を使用する既存のパイプラインは、 Unity Catalogを使用するようにアップグレードすることはできません。に書き込む既存のパイプラインを移行するには Hive metastoreに新しいパイプラインを作成し、データソースからデータを再取り込みする必要があります。

  • サードパーティのライブラリと JAR はサポートされていません。

  • ストリーミング テーブルのスキーマを変更するデータ操作言語 (DML) クエリー はサポートされていません。

  • Delta Live Tables パイプラインで作成された具体化されたビューは、そのパイプラインの外部 (別のパイプラインやダウンストリーム ノートブックなど) のストリーミング ソースとして使用することはできません。

  • 管理されたストレージの場所を指定するスキーマへの発行は、 プレビュー チャネルでのみサポートされます。

  • パイプラインがマネージド ストレージの場所を持つスキーマに発行する場合、スキーマは後続の更新で変更できますが、更新されたスキーマが以前に指定したスキーマと同じストレージの場所を使用している場合に限ります。

  • ターゲット スキーマで格納場所が指定されている場合、すべてのテーブルがそこに格納されます。 スキーマの保管場所が指定されていない場合、ターゲット・カタログで指定されていれば、表はカタログの保管場所に保管されます。 スキーマとカタログの格納場所が指定されていない場合、テーブルは、テーブルがパブリッシュされるメタストアのルート格納場所に格納されます。

  • カタログ エクスプローラーの [ 履歴 ] タブには、ストリーミング テーブルと具体化されたビューの履歴は表示されません。

  • LOCATION プロパティは、テーブルの定義時にはサポートされません。

  • Unity カタログ対応パイプラインは Hive metastoreに発行できません。

  • Python UDFs はサポートされていません。

  • Unity Catalog にパブリッシュされた DeltaLive Tablesマテリアライズドビューまたはストリーミングテーブルで Delta Sharing を使用することはできません。

  • パイプラインまたはクエリーで event_log テーブル値関数 を使用して、複数のパイプラインのイベント ログにアクセスすることはできません。

  • event_log テーブル値関数 で作成されたビューを他のユーザーと共有することはできません。

  • 単一ノードクラスターは、Unity カタログ対応パイプラインではサポートされていません。 Delta Live Tables は、より小さなパイプラインを実行するために単一ノードクラスターを作成する可能性があるため、パイプラインが失敗し、 single-node modeを参照するエラーメッセージが表示されることがあります。 これが発生した場合は、 コンピュート設定を構成するときに、少なくとも 1 人のワーカーを指定していることを確認してください。

  • Tables created in a Unity Catalog-enabled pipeline cannot be queried from assigned or no isolation clusters. To query tables created by a Delta Live Tables pipeline, you must use a shared access mode cluster using Databricks Runtime 13.3 LTS and above or a SQL warehouse.

  • Delta Live Tables では、共有アクセス モード クラスターを使用して、Unity Catalog 対応パイプラインを実行します。 Unity Catalog 対応パイプラインは、割り当てられたクラスターでは実行できません。 Unity Catalog での共有アクセス モードの制限事項については、「 Unity Catalog での共有アクセス モードの制限事項」を参照してください。

  • マテリアライズドビューまたはストリーミングテーブルで 行フィルターまたは列マスク を Unity Catalog使用することはできません。

具体化されたビューをサポートする基になるファイルには、具体化されたビューの定義に表示されないアップストリームテーブルのデータ (個人を特定できる可能性のある情報を含む) が含まれる場合があります。 このデータは、具体化されたビューの増分更新をサポートするために、基になるストレージに自動的に追加されます。

具体化されたビューの基になるファイルは、具体化されたビュー スキーマの一部ではないアップストリーム テーブルからのデータを公開するリスクがあるため、Databricks では、基になるストレージを信頼されていないダウンストリーム コンシューマーと共有しないことをお勧めします。

たとえば、マテリアライズドビューの定義に COUNT(DISTINCT field_a) 句が含まれているとします。 マテリアライズドビュー定義には aggregate COUNT DISTINCT 句のみが含まれていますが、基になるファイルには field_aの実際の値のリストが含まれます。

既存の機能に対する変更

Unity Catalog にデータを保持するように Delta Live Tables が構成されている場合、テーブルのライフサイクルは Delta Live Tables パイプラインによって管理されます。パイプラインはテーブルのライフサイクルとアクセス許可を管理するため、次のようになります。

  • Delta Live Tables パイプライン定義からテーブルが削除されると、対応するマテリアライズドビューまたはストリーミングテーブルエントリは、次のパイプライン更新時に Unity Catalog から削除されます。 実際のデータは、誤って削除された場合に回復できるように、一定期間保持されます。 データは、具体化されたビューまたはストリーミング テーブルをパイプライン定義に追加し直すことで回復できます。

  • Delta Live Tables パイプラインを削除すると、そのパイプラインで定義されているすべてのテーブルが削除されます。 この変更により、 Delta Live Tables UI が更新され、パイプラインの削除を確認するメッセージが表示されます。

  • 内部バッキング・テーブル ( APPLY CHANGES INTOをサポートするために使用されるバッキング・テーブルを含む) には、ユーザーが直接アクセスすることはできません。

Delta Live Tables パイプライン からUnity Catalog するテーブルを書き込む

テーブルを Unity Catalog に書き込むには、パイプラインを作成するときに、 [ストレージ オプション] で [Unity Catalog] を選択し、 [カタログ] ドロップダウン メニューでカタログを選択し、 [ターゲット スキーマ] ドロップダウン メニューでデータベース名を選択します。Unity Catalog カタログの詳細については、「 カタログ」を参照してください。 Unity Catalog のスキーマについては、「 スキーマ」を参照してください。

Unity Catalog パイプラインにデータを取り込む

Unity Catalog を使用するように構成されたパイプラインは、以下からデータを読み取ることができます。

  • Unity Catalog マネージドテーブルと外部テーブル、ビュー、具体化されたビュー、ストリーミングテーブルがあります。

  • テーブルとビューHive metastore 。

  • Auto Loader cloud_files() 機能を使用して Unity Catalog 外部から読み取ります。

  • Apache Kafka と Amazon Kinesis。

次に、 Unity Catalog テーブルと Hive metastore テーブルからの読み取り例を示します。

Unity Catalog テーブルからのバッチ インジェスト

CREATE OR REFRESH LIVE TABLE
  table_name
AS SELECT
  *
FROM
  my_catalog.my_schema.table1;
@dlt.table
def table_name():
  return spark.table("my_catalog.my_schema.table")

Unity Catalog テーブルからのストリーム変更

CREATE OR REFRESH STREAMING TABLE
  table_name
AS SELECT
  *
FROM
  STREAM(my_catalog.my_schema.table1);
@dlt.table
def table_name():
  return spark.readStream.table("my_catalog.my_schema.table")

Hive metastore からデータを取り込む

Unity Catalog を使用するパイプラインは、 hive_metastore カタログを使用して Hive metastore テーブルからデータを読み取ることができます。

CREATE OR REFRESH LIVE TABLE
  table_name
AS SELECT
  *
FROM
  hive_metastore.some_schema.table;
@dlt.table
def table3():
  return spark.table("hive_metastore.some_schema.table")

Auto Loader からデータを取り込む

CREATE OR REFRESH STREAMING TABLE
  table_name
AS SELECT
  *
FROM
  cloud_files(
    <path-to-uc-external-location>,
    "json"
  )
@dlt.table(table_properties={"quality": "bronze"})
def table_name():
  return (
     spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .load(f"{path_to_uc_external_location}")
 )

マテリアライズドビュー(ライブテーブル)を共有する

Default では、パイプラインによって作成されたテーブルは、パイプライン所有者のみがクエリーできます。 他のユーザーに GRANT ステートメントを使用して表クエリーを実行できるようにしたり、 REVOKE ステートメントを使用してクエリー・アクセスを取り消すことができます。Unity Catalogの権限について詳しくは、 Unity Catalogでの権限の管理を参照してください。

テーブルの選択を許可

GRANT SELECT ON TABLE
  my_catalog.my_schema.live_table
TO
  `user@databricks.com`

テーブルの選択を取り消す

REVOKE SELECT ON TABLE
  my_catalog.my_schema.live_table
FROM
  `user@databricks.com`

テーブルの作成権限またはマテリアライズドビューの作成権限の付与

GRANT CREATE { MATERIALIZED VIEW | TABLE } ON SCHEMA
  my_catalog.my_schema
TO
  { principal | user }

パイプラインの系列を表示する

Delta Live Tablesパイプライン内のテーブルのリネージは、カタログ エクスプローラーに表示されます。 Unity カタログ対応パイプラインのマテリアライズド ビューまたはストリーミング テーブルの場合、カタログ エクスプローラー リネージ UI には上流テーブルと下流テーブルが表示されます。 Unity Catalogリネージの詳細については、 「 Unity Catalogを使用したデータ リネージのキャプチャと表示」を参照してください。

Unity カタログ対応の Delta Live Tables パイプラインの具体化されたビューまたはストリーミング テーブルの場合、パイプラインが現在のワークスペースからアクセス可能な場合、カタログ エクスプローラー リネージ UI は、具体化されたビューまたはストリーミング テーブルを生成したパイプラインにもリンクします。

ストリーミングテーブルのデータを追加、変更、または削除する

データ操作言語 (DML) ステートメント (挿入、更新、削除、マージ ステートメントなど) を使用して、Unity Catalog に発行されたストリーミング テーブルを変更できます。ストリーミングテーブルに対する DML クエリーのサポートにより、EU 一般データ保護規則 (GDPR) コンプライアンスのためのテーブルの更新などのユースケースが可能になります。

  • ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブルスキーマを進化させようとしないことを確認します。

  • DML statements that update a streaming table can be run only in a shared Unity Catalog cluster or a SQL warehouse using Databricks Runtime 13.3 LTS and above.

  • ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を含むソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグ を設定します。 skipChangeCommits を設定すると、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、マテリアライズド ビュー (追加のみの制限がない) をターゲット テーブルとして使用できます。

ストリーミングテーブルのレコードを変更する DML ステートメントの例を次に示します。

特定の ID を持つレコードを削除します。

DELETE FROM my_streaming_table WHERE id = 123;

特定の ID でレコードを更新する:

UPDATE my_streaming_table SET name = 'Jane Doe' WHERE id = 123;