Delta Live Tables シンクを使用してレコードを外部サービスにストリームする

プレビュー

Delta Live Tables sink API は パブリック プレビュー段階です。

この記事では、Delta Live TablessinkAPI Unity CatalogHive metastoreApacheKafkaと DLT フロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンク ( 管理テーブルや外部テーブル、 テーブル、イベント ストリーミング サービスAzure ( や Event Hubs など) に書き込む方法について説明します 。

Delta Live Tables シンクとは何ですか?

Delta Live Tables シンクを使用すると、 Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、 Unity Catalog または Hive metastoreによって管理される外部テーブルなどのターゲットに変換されたデータを書き込むことができます。 以前は、Delta Live Tables パイプラインで作成されたストリーミング テーブルと具体化されたビューは、Databricks で管理される Delta テーブルにのみ保持できました。 シンクを使用すると、Delta Live Tables パイプラインの出力を保持するためのオプションが増えました。

Delta Live Tables シンクはいつ使用する必要がありますか?

Databricks では、次の必要がある場合は Delta Live Tables シンクを使用することをお勧めします。

  • 不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。

  • 変換されたデータを Delta Live Tables フローから、外部 Delta インスタンスによって管理されるテーブル ( Unity Catalog 管理テーブル、外部テーブル、 Hive metastore テーブルなど) に書き込みます。

  • トピックなど、 ETL外部のシンクへの逆抽出、変換、ロード()を行います。DatabricksApacheKafkaこのアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。

Delta Live Tables シンク操作方法使用しますか?

注:

  • spark.readStreamdlt.read_stream を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。

  • シンクへの書き込みに使用できるのは append_flow のみです。 apply_changesなどの他のフローはサポートされていません。

  • 完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

イベント・データがストリーミング・ソースから Delta Live Tables パイプラインに取り込まれるとき、 Delta Live Tables 機能を使用してこのデータを処理および調整し、アペンド・フロー処理を使用して、変換されたデータ・レコードを Delta Live Tables シンクにストリームします。 このシンクは、 create_sink() 関数を使用して作成します。 create_sink 関数の使用の詳細については、シンク API リファレンスを参照してください。

Delta Live Tables シンクを実装するには、次の手順に従います。

  1. Delta Live Tables パイプラインを設定して、ストリーミング イベント データを処理し、Delta Live Tables シンクに書き込むためのデータ レコードを準備します。

  2. 優先ターゲット シンク形式を使用するように Delta Live Tables シンクを構成して作成します。

  3. 追加フローを使用して、準備されたレコードをシンクに書き込みます。

これらの手順については、このトピックの残りの部分で説明します。

Delta Live Tables パイプラインを設定して、シンクに書き込むためのレコードを準備する

最初の手順では、Delta Live Tables パイプラインを設定して、未加工のイベント ストリーム データをシンクに書き込む準備済みデータに変換します。

このプロセスをよりよく理解するために、Databricks の wikipedia-datasets サンプル データからのクリックストリーム イベント データを処理する Delta Live Tables パイプラインの次の例に従うことができます。 このパイプラインは、未加工のデータセットを解析して、Apache Spark のドキュメンテーション ページにリンクしている Wikipedia ページを特定し、そのデータを参照リンクに含まれるテーブル行のみに段階的に絞り込みます Apache_Spark.

この例では、 Delta Live Tables パイプラインは メダリオンアーキテクチャを使用して構造化されており、データをさまざまなレイヤーに整理して品質と処理効率を向上させています。

まず、JSON を使用して、データセットの生の レコードをブロンズレイヤーにロードします。Auto Loaderこの Python コードは、ソースからの未処理の生データを含む clickstream_rawという名前のストリーミングテーブルを作成する方法を示しています。

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

このコードの実行後、データはメダリオンアーキテクチャの "bronze" (または "生データ") レベルになり、クリーンアップする必要があります。 次の手順では、データを "シルバー" レベルに絞り込み、データ型と列名をクリーンアップし、 Delta Live Tables エクスペクテーションを使用してデータの完全性を確保します。

次のコードは、ブロンズレイヤーのデータを clickstream_clean silver テーブルにクリーニングして検証することで、これを行う方法を示しています。

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

パイプライン構造の「ゴールドレイヤー」を作成するには、クリーニングされたクリックストリームデータをフィルタリングして、参照ページが Apache_Sparkエントリを分離します。 この最後のコード例では、ターゲット シンク テーブルへの書き込みに必要な列のみを選択します。

次のコードは、ゴールドレイヤーを表す spark_referrers というテーブルを作成する方法を示しています。

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

このデータ準備プロセスが完了したら、クリーニングされたレコードが書き込まれる宛先シンクを構成する必要があります。

Delta Live Tables シンクを構成する

Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。

  • Delta テーブルシンク

  • Apache Kafka シンク

  • Azure Event Hubs シンク

Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。

ファイルパスで Delta シンクを作成するには:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

完全修飾カタログとスキーマパスを使用してテーブル名で Delta シンクを作成するには:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

このコードは、Apache Kafka シンクと Azure Event Hubs シンクの両方で機能します。

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "subscribe": "dlt-sink",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

シンクが構成され、Delta Live Tables パイプラインが準備されたので、処理されたレコードをシンクにストリーミングを開始できます。

追加フローを使用して Delta Live Tables シンクに書き込む

シンクを構成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。 これを行うには、シンクを append_flow デコレータのtarget値として指定します。

  • Unity Catalog の管理テーブルと外部テーブルの場合は、 delta 形式を使用し、オプションでパスまたはテーブル名を指定します。 Delta Live Tables パイプラインは、Unity Catalog を使用するように構成する必要があります。

  • Apache Kafka トピックの場合は、 kafka の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka 構造化ストリーミング ライターの構成を参照してください。

  • Azure Event Hubs の場合は、 kafka 形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。

  • Hive metastoreテーブルの場合は、delta 形式を使用し、オプションでパスまたはテーブル名を指定します。Delta Live Tables パイプラインは、Hive metastoreを使用するように設定する必要があります。

以下は、Delta Live Tables パイプラインによって処理されたレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

value パラメーターは、Azure Event Hubs シンクに必須です。keypartitionheaderstopic などの追加のパラメーターはオプションです。

append_flow デコレーターの詳細については、「追加フローを使用して複数のソース ストリームからストリーミング テーブルに書き込む」を参照してください。

制限事項

  • Python API のみがサポートされています。 SQL はサポートされていません。

  • spark.readStreamdlt.read_stream を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。

  • シンクへの書き込みに使用できるのは append_flow のみです。 apply_changesなどの他のフローはサポートされておらず、Delta Live Tables データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は <catalog>.<schema>.<table> の形式である必要があります。 Hive metastoreの場合、<schema>.<table>形式である必要があります。

  • FullRefreshを実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

  • Delta Live Tables のエクスペクテーションはサポートされていません。